summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2021-12-06 22:25:46 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2021-12-07 21:25:38 +0100
commit709a4a3698cb7cf280dbcc63acf2824d88d1a8c6 (patch)
tree5fc0f2da8410b796e46e9ebdb91a3319c5f2fd9a
parentd7a8706c3409d7666aa08620e3ae5982797bf0c9 (diff)
Storage: head watch with selector and unwatch function
-rw-r--r--src/Network.hs2
-rw-r--r--src/Storage.hs46
-rw-r--r--src/Storage/Internal.hs18
3 files changed, 51 insertions, 15 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 1edc70c..26f1db3 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -293,7 +293,7 @@ startServer opt origHead logd' services = do
sendToPeerS peer packet
_ -> return ()
- watchHead origHead $ \h -> do
+ void $ watchHead origHead $ \h -> do
let idt = headLocalIdentity h
changedId <- modifyMVar midentity $ \cur ->
return (idt, cur /= idt)
diff --git a/src/Storage.hs b/src/Storage.hs
index e0f0c7a..5ef9bec 100644
--- a/src/Storage.hs
+++ b/src/Storage.hs
@@ -21,7 +21,9 @@ module Storage (
headId, headRef, headObject, headStoredObject,
loadHeads, loadHead, reloadHead,
storeHead, replaceHead, updateHead, updateHead_,
- watchHead,
+
+ WatchedHead,
+ watchHead, watchHeadWith, unwatchHead,
Storable(..), ZeroStorable(..),
StorableText(..), StorableDate(..), StorableUUID(..),
@@ -111,13 +113,13 @@ openStorage :: FilePath -> IO Storage
openStorage path = do
createDirectoryIfMissing True $ path ++ "/objects"
createDirectoryIfMissing True $ path ++ "/heads"
- watchers <- newMVar ([], [])
+ watchers <- newMVar ([], WatchList 1 [])
refgen <- newMVar =<< HT.new
return $ Storage { stBacking = StorageDir path watchers, stParent = Nothing, stRefGeneration = refgen }
memoryStorage' :: IO (Storage' c')
memoryStorage' = do
- backing <- StorageMemory <$> newMVar [] <*> newMVar M.empty <*> newMVar M.empty <*> newMVar []
+ backing <- StorageMemory <$> newMVar [] <*> newMVar M.empty <*> newMVar M.empty <*> newMVar (WatchList 1 [])
refgen <- newMVar =<< HT.new
return $ Storage { stBacking = backing, stParent = Nothing, stRefGeneration = refgen }
@@ -439,7 +441,7 @@ replaceHead prev@(Head hid pobj) stored = do
StorageMemory { memHeads = theads, memWatchers = twatch } -> do
res <- modifyMVar theads $ \hs -> do
- ws <- map snd . filter ((==(tid, hid)) . fst) <$> readMVar twatch
+ ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch
return $ case partition ((==(tid, hid)) . fst) hs of
([] , _ ) -> (hs, Left Nothing)
((_, r):_, hs') | r == storedRef pobj -> (((tid, hid), storedRef stored) : hs',
@@ -460,12 +462,25 @@ updateHead h f = do
updateHead_ :: HeadType a => Head a -> (Stored a -> IO (Stored a)) -> IO (Maybe (Head a))
updateHead_ h = fmap fst . updateHead h . (fmap (,()) .)
-watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO ()
-watchHead (Head hid (Stored (Ref st _) _)) cb = do
- let cb' = cb . Head hid . wrappedLoad
- tid = headTypeID @a Proxy
+
+data WatchedHead = WatchedHead Storage WatchID
+
+watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHead
+watchHead h = watchHeadWith h id
+
+watchHeadWith :: forall a b. HeadType a => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead
+watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do
+ let tid = headTypeID @a Proxy
+ addWatcher wl = (wl', WatchedHead st (wlNext wl))
+ where wl' = wl { wlNext = wlNext wl + 1
+ , wlList = WatchListItem
+ { wlID = wlNext wl
+ , wlHead = (tid, hid)
+ , wlFun = cb . sel . Head hid . wrappedLoad
+ } : wlList wl
+ }
case stBacking st of
- StorageDir { dirPath = spath, dirWatchers = mvar } -> modifyMVar_ mvar $ \(ilist, watchers) -> do
+ StorageDir { dirPath = spath, dirWatchers = mvar } -> modifyMVar mvar $ \(ilist, wl) -> do
ilist' <- case lookup tid ilist of
Just _ -> return ilist
Nothing -> do
@@ -473,13 +488,20 @@ watchHead (Head hid (Stored (Ref st _) _)) cb = do
void $ addWatch inotify [Move] (BC.pack $ headTypePath spath tid) $ \case
MovedIn { filePath = fpath } | Just ihid <- HeadID <$> U.fromASCIIBytes fpath -> do
loadHead @a st ihid >>= \case
- Just h -> mapM_ ($ headRef h) . map snd . filter ((== (tid, ihid)) . fst) . snd =<< readMVar mvar
+ Just h -> mapM_ ($ headRef h) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar
Nothing -> return ()
_ -> return ()
return $ (tid, inotify) : ilist
- return (ilist', ((tid, hid), cb') : watchers)
+ return $ first (ilist',) $ addWatcher wl
- StorageMemory { memWatchers = mvar } -> modifyMVar_ mvar $ return . (((tid, hid), cb') :)
+ StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher
+
+unwatchHead :: WatchedHead -> IO ()
+unwatchHead (WatchedHead st wid) = do
+ let delWatcher wl = wl { wlList = filter ((/=wid) . wlID) $ wlList wl }
+ case stBacking st of
+ StorageDir { dirWatchers = mvar } -> modifyMVar_ mvar $ return . second delWatcher
+ StorageMemory { memWatchers = mvar } -> modifyMVar_ mvar $ return . delWatcher
class Storable a where
diff --git a/src/Storage/Internal.hs b/src/Storage/Internal.hs
index d589c44..7e593f8 100644
--- a/src/Storage/Internal.hs
+++ b/src/Storage/Internal.hs
@@ -59,15 +59,29 @@ showParentStorage Storage { stParent = Just st } = "@" ++ show st
data StorageBacking c
= StorageDir { dirPath :: FilePath
- , dirWatchers :: MVar ([(HeadTypeID, INotify)], [((HeadTypeID, HeadID), Ref' c -> IO ())])
+ , dirWatchers :: MVar ([(HeadTypeID, INotify)], WatchList c)
}
| StorageMemory { memHeads :: MVar [((HeadTypeID, HeadID), Ref' c)]
, memObjs :: MVar (Map RefDigest BL.ByteString)
, memKeys :: MVar (Map RefDigest ScrubbedBytes)
- , memWatchers :: MVar [((HeadTypeID, HeadID), Ref' c -> IO ())]
+ , memWatchers :: MVar (WatchList c)
}
deriving (Eq)
+newtype WatchID = WatchID Int
+ deriving (Eq, Ord, Num)
+
+data WatchList c = WatchList
+ { wlNext :: WatchID
+ , wlList :: [WatchListItem c]
+ }
+
+data WatchListItem c = WatchListItem
+ { wlID :: WatchID
+ , wlHead :: (HeadTypeID, HeadID)
+ , wlFun :: Ref' c -> IO ()
+ }
+
newtype RefDigest = RefDigest (Digest Blake2b_256)
deriving (Eq, Ord, NFData, ByteArrayAccess)