From 709a4a3698cb7cf280dbcc63acf2824d88d1a8c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Mon, 6 Dec 2021 22:25:46 +0100 Subject: Storage: head watch with selector and unwatch function --- src/Network.hs | 2 +- src/Storage.hs | 46 ++++++++++++++++++++++++++++++++++------------ src/Storage/Internal.hs | 18 ++++++++++++++++-- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/Network.hs b/src/Network.hs index 1edc70c..26f1db3 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -293,7 +293,7 @@ startServer opt origHead logd' services = do sendToPeerS peer packet _ -> return () - watchHead origHead $ \h -> do + void $ watchHead origHead $ \h -> do let idt = headLocalIdentity h changedId <- modifyMVar midentity $ \cur -> return (idt, cur /= idt) diff --git a/src/Storage.hs b/src/Storage.hs index e0f0c7a..5ef9bec 100644 --- a/src/Storage.hs +++ b/src/Storage.hs @@ -21,7 +21,9 @@ module Storage ( headId, headRef, headObject, headStoredObject, loadHeads, loadHead, reloadHead, storeHead, replaceHead, updateHead, updateHead_, - watchHead, + + WatchedHead, + watchHead, watchHeadWith, unwatchHead, Storable(..), ZeroStorable(..), StorableText(..), StorableDate(..), StorableUUID(..), @@ -111,13 +113,13 @@ openStorage :: FilePath -> IO Storage openStorage path = do createDirectoryIfMissing True $ path ++ "/objects" createDirectoryIfMissing True $ path ++ "/heads" - watchers <- newMVar ([], []) + watchers <- newMVar ([], WatchList 1 []) refgen <- newMVar =<< HT.new return $ Storage { stBacking = StorageDir path watchers, stParent = Nothing, stRefGeneration = refgen } memoryStorage' :: IO (Storage' c') memoryStorage' = do - backing <- StorageMemory <$> newMVar [] <*> newMVar M.empty <*> newMVar M.empty <*> newMVar [] + backing <- StorageMemory <$> newMVar [] <*> newMVar M.empty <*> newMVar M.empty <*> newMVar (WatchList 1 []) refgen <- newMVar =<< HT.new return $ Storage { stBacking = backing, stParent = Nothing, stRefGeneration = refgen } @@ -439,7 +441,7 @@ replaceHead prev@(Head hid pobj) stored = do StorageMemory { memHeads = theads, memWatchers = twatch } -> do res <- modifyMVar theads $ \hs -> do - ws <- map snd . filter ((==(tid, hid)) . fst) <$> readMVar twatch + ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch return $ case partition ((==(tid, hid)) . fst) hs of ([] , _ ) -> (hs, Left Nothing) ((_, r):_, hs') | r == storedRef pobj -> (((tid, hid), storedRef stored) : hs', @@ -460,12 +462,25 @@ updateHead h f = do updateHead_ :: HeadType a => Head a -> (Stored a -> IO (Stored a)) -> IO (Maybe (Head a)) updateHead_ h = fmap fst . updateHead h . (fmap (,()) .) -watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO () -watchHead (Head hid (Stored (Ref st _) _)) cb = do - let cb' = cb . Head hid . wrappedLoad - tid = headTypeID @a Proxy + +data WatchedHead = WatchedHead Storage WatchID + +watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHead +watchHead h = watchHeadWith h id + +watchHeadWith :: forall a b. HeadType a => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead +watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do + let tid = headTypeID @a Proxy + addWatcher wl = (wl', WatchedHead st (wlNext wl)) + where wl' = wl { wlNext = wlNext wl + 1 + , wlList = WatchListItem + { wlID = wlNext wl + , wlHead = (tid, hid) + , wlFun = cb . sel . Head hid . wrappedLoad + } : wlList wl + } case stBacking st of - StorageDir { dirPath = spath, dirWatchers = mvar } -> modifyMVar_ mvar $ \(ilist, watchers) -> do + StorageDir { dirPath = spath, dirWatchers = mvar } -> modifyMVar mvar $ \(ilist, wl) -> do ilist' <- case lookup tid ilist of Just _ -> return ilist Nothing -> do @@ -473,13 +488,20 @@ watchHead (Head hid (Stored (Ref st _) _)) cb = do void $ addWatch inotify [Move] (BC.pack $ headTypePath spath tid) $ \case MovedIn { filePath = fpath } | Just ihid <- HeadID <$> U.fromASCIIBytes fpath -> do loadHead @a st ihid >>= \case - Just h -> mapM_ ($ headRef h) . map snd . filter ((== (tid, ihid)) . fst) . snd =<< readMVar mvar + Just h -> mapM_ ($ headRef h) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar Nothing -> return () _ -> return () return $ (tid, inotify) : ilist - return (ilist', ((tid, hid), cb') : watchers) + return $ first (ilist',) $ addWatcher wl - StorageMemory { memWatchers = mvar } -> modifyMVar_ mvar $ return . (((tid, hid), cb') :) + StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher + +unwatchHead :: WatchedHead -> IO () +unwatchHead (WatchedHead st wid) = do + let delWatcher wl = wl { wlList = filter ((/=wid) . wlID) $ wlList wl } + case stBacking st of + StorageDir { dirWatchers = mvar } -> modifyMVar_ mvar $ return . second delWatcher + StorageMemory { memWatchers = mvar } -> modifyMVar_ mvar $ return . delWatcher class Storable a where diff --git a/src/Storage/Internal.hs b/src/Storage/Internal.hs index d589c44..7e593f8 100644 --- a/src/Storage/Internal.hs +++ b/src/Storage/Internal.hs @@ -59,15 +59,29 @@ showParentStorage Storage { stParent = Just st } = "@" ++ show st data StorageBacking c = StorageDir { dirPath :: FilePath - , dirWatchers :: MVar ([(HeadTypeID, INotify)], [((HeadTypeID, HeadID), Ref' c -> IO ())]) + , dirWatchers :: MVar ([(HeadTypeID, INotify)], WatchList c) } | StorageMemory { memHeads :: MVar [((HeadTypeID, HeadID), Ref' c)] , memObjs :: MVar (Map RefDigest BL.ByteString) , memKeys :: MVar (Map RefDigest ScrubbedBytes) - , memWatchers :: MVar [((HeadTypeID, HeadID), Ref' c -> IO ())] + , memWatchers :: MVar (WatchList c) } deriving (Eq) +newtype WatchID = WatchID Int + deriving (Eq, Ord, Num) + +data WatchList c = WatchList + { wlNext :: WatchID + , wlList :: [WatchListItem c] + } + +data WatchListItem c = WatchListItem + { wlID :: WatchID + , wlHead :: (HeadTypeID, HeadID) + , wlFun :: Ref' c -> IO () + } + newtype RefDigest = RefDigest (Digest Blake2b_256) deriving (Eq, Ord, NFData, ByteArrayAccess) -- cgit v1.2.3