diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-06-29 22:17:52 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-06-30 15:42:55 +0200 |
commit | fb2f418a6b2b00f5b1f032547bb7e47749a23b80 (patch) | |
tree | 47852ce8264cfe9ddaca9c372244e46de6a89fa2 | |
parent | a16efeb30c3d68479e609196e6e1320c89acc6a6 (diff) |
Storage watching tests with multiple heads and readers
-rw-r--r-- | erebos.cabal | 1 | ||||
-rw-r--r-- | main/Test.hs | 61 | ||||
-rw-r--r-- | src/Erebos/Storage.hs | 74 | ||||
-rw-r--r-- | test/storage.test | 228 |
4 files changed, 336 insertions, 28 deletions
diff --git a/erebos.cabal b/erebos.cabal index 45e6526..8a8f0bc 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -195,3 +195,4 @@ executable erebos text, time, transformers >= 0.5 && <0.7, + uuid, diff --git a/main/Test.hs b/main/Test.hs index a957f4b..d5737c2 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -23,6 +23,7 @@ import Data.Text qualified as T import Data.Text.Encoding import Data.Text.IO qualified as T import Data.Typeable +import Data.UUID qualified as U import Network.Socket @@ -51,6 +52,8 @@ import Test.Service data TestState = TestState { tsHead :: Maybe (Head LocalState) , tsServer :: Maybe RunningServer + , tsWatchedHeads :: [ ( Int, WatchedHead ) ] + , tsWatchedHeadNext :: Int , tsWatchedLocalIdentity :: Maybe WatchedHead , tsWatchedSharedIdentity :: Maybe WatchedHead } @@ -65,6 +68,8 @@ initTestState :: TestState initTestState = TestState { tsHead = Nothing , tsServer = Nothing + , tsWatchedHeads = [] + , tsWatchedHeadNext = 1 , tsWatchedLocalIdentity = Nothing , tsWatchedSharedIdentity = Nothing } @@ -243,6 +248,10 @@ commands = map (T.pack *** id) , ("stored-roots", cmdStoredRoots) , ("stored-set-add", cmdStoredSetAdd) , ("stored-set-list", cmdStoredSetList) + , ("head-create", cmdHeadCreate) + , ("head-replace", cmdHeadReplace) + , ("head-watch", cmdHeadWatch) + , ("head-unwatch", cmdHeadUnwatch) , ("create-identity", cmdCreateIdentity) , ("start-server", cmdStartServer) , ("stop-server", cmdStopServer) @@ -321,6 +330,58 @@ cmdStoredSetList = do cmdOut $ "stored-set-item" ++ concatMap ((' ':) . show . refDigest . storedRef) item cmdOut $ "stored-set-done" +cmdHeadCreate :: Command +cmdHeadCreate = do + [ ttid, tref ] <- asks tiParams + st <- asks tiStorage + Just tid <- return $ fromUUID <$> U.fromText ttid + Just ref <- liftIO $ readRef st (encodeUtf8 tref) + + h <- storeHeadRaw st tid ref + cmdOut $ unwords $ [ "head-create-done", show (toUUID tid), show (toUUID h) ] + +cmdHeadReplace :: Command +cmdHeadReplace = do + [ ttid, thid, told, tnew ] <- asks tiParams + st <- asks tiStorage + Just tid <- return $ fmap fromUUID $ U.fromText ttid + Just hid <- return $ fmap fromUUID $ U.fromText thid + Just old <- liftIO $ readRef st (encodeUtf8 told) + Just new <- liftIO $ readRef st (encodeUtf8 tnew) + + replaceHeadRaw st tid hid old new >>= cmdOut . unwords . \case + Left Nothing -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ] + Left (Just r) -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew, show (refDigest r) ] + Right _ -> [ "head-replace-done", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ] + +cmdHeadWatch :: Command +cmdHeadWatch = do + [ ttid, thid ] <- asks tiParams + st <- asks tiStorage + Just tid <- return $ fmap fromUUID $ U.fromText ttid + Just hid <- return $ fmap fromUUID $ U.fromText thid + + out <- asks tiOutput + wid <- gets tsWatchedHeadNext + + watched <- liftIO $ watchHeadRaw st tid hid id $ \r -> do + outLine out $ unwords [ "head-watch-cb", show wid, show $ refDigest r ] + + modify $ \s -> s + { tsWatchedHeads = ( wid, watched ) : tsWatchedHeads s + , tsWatchedHeadNext = wid + 1 + } + + cmdOut $ unwords $ [ "head-watch-done", T.unpack ttid, T.unpack thid, show wid ] + +cmdHeadUnwatch :: Command +cmdHeadUnwatch = do + [ twid ] <- asks tiParams + let wid = read (T.unpack twid) + Just watched <- lookup wid <$> gets tsWatchedHeads + liftIO $ unwatchHead watched + cmdOut $ unwords [ "head-unwatch-done", show wid ] + initTestHead :: Head LocalState -> Command initTestHead h = do _ <- liftIO . watchReceivedMessages h . dmReceivedWatcher =<< asks tiOutput diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index 95ef649..6526f40 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -21,9 +21,11 @@ module Erebos.Storage ( headId, headStorage, headRef, headObject, headStoredObject, loadHeads, loadHead, reloadHead, storeHead, replaceHead, updateHead, updateHead_, + loadHeadRaw, storeHeadRaw, replaceHeadRaw, WatchedHead, watchHead, watchHeadWith, unwatchHead, + watchHeadRaw, MonadStorage(..), @@ -62,7 +64,6 @@ module Erebos.Storage ( ) where import Control.Applicative -import Control.Arrow import Control.Concurrent import Control.Exception import Control.Monad @@ -72,6 +73,7 @@ import Control.Monad.Writer import Crypto.Hash +import Data.Bifunctor import Data.ByteString (ByteString) import qualified Data.ByteArray as BA import qualified Data.ByteString as B @@ -436,57 +438,70 @@ loadHeads Storage { stBacking = StorageMemory { memHeads = theads } } = liftIO $ catMaybes . map toHead <$> readMVar theads loadHead :: forall a m. (HeadType a, MonadIO m) => Storage -> HeadID -> m (Maybe (Head a)) -loadHead s@(Storage { stBacking = StorageDir { dirPath = spath }}) hid = liftIO $ do +loadHead st hid = fmap (Head hid . wrappedLoad) <$> loadHeadRaw st (headTypeID @a Proxy) hid + +loadHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> m (Maybe Ref) +loadHeadRaw s@(Storage { stBacking = StorageDir { dirPath = spath }}) tid hid = liftIO $ do handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ do - (h:_) <- BC.lines <$> B.readFile (headPath spath (headTypeID @a Proxy) hid) + (h:_) <- BC.lines <$> B.readFile (headPath spath tid hid) Just ref <- readRef s h - return $ Just $ Head hid $ wrappedLoad ref -loadHead Storage { stBacking = StorageMemory { memHeads = theads } } hid = liftIO $ do - fmap (Head hid . wrappedLoad) . lookup (headTypeID @a Proxy, hid) <$> readMVar theads + return $ Just ref +loadHeadRaw Storage { stBacking = StorageMemory { memHeads = theads } } tid hid = liftIO $ do + lookup (tid, hid) <$> readMVar theads reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) -storeHead st obj = liftIO $ do +storeHead st obj = do let tid = headTypeID @a Proxy - hid <- HeadID <$> U.nextRandom stored <- wrappedStore st obj + hid <- storeHeadRaw st tid (storedRef stored) + return $ Head hid stored + +storeHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> Ref -> m HeadID +storeHeadRaw st tid ref = liftIO $ do + hid <- HeadID <$> U.nextRandom case stBacking st of StorageDir { dirPath = spath } -> do Right () <- writeFileChecked (headPath spath tid hid) Nothing $ - showRef (storedRef stored) `B.append` BC.singleton '\n' + showRef ref `B.append` BC.singleton '\n' return () StorageMemory { memHeads = theads } -> do - modifyMVar_ theads $ return . (((tid, hid), storedRef stored) :) - return $ Head hid stored + modifyMVar_ theads $ return . (((tid, hid), ref) :) + return hid replaceHead :: forall a m. (HeadType a, MonadIO m) => Head a -> Stored a -> m (Either (Maybe (Head a)) (Head a)) replaceHead prev@(Head hid pobj) stored' = liftIO $ do let st = headStorage prev tid = headTypeID @a Proxy stored <- copyStored st stored' + bimap (fmap $ Head hid . wrappedLoad) (const $ Head hid stored) <$> + replaceHeadRaw st tid hid (storedRef pobj) (storedRef stored) + +replaceHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> Ref -> Ref -> m (Either (Maybe Ref) Ref) +replaceHeadRaw st tid hid prev new = liftIO $ do case stBacking st of StorageDir { dirPath = spath } -> do let filename = headPath spath tid hid showRefL r = showRef r `B.append` BC.singleton '\n' - writeFileChecked filename (Just $ showRefL $ headRef prev) (showRefL $ storedRef stored) >>= \case + writeFileChecked filename (Just $ showRefL prev) (showRefL new) >>= \case Left Nothing -> return $ Left Nothing Left (Just bs) -> do Just oref <- readRef st $ BC.takeWhile (/='\n') bs - return $ Left $ Just $ Head hid $ wrappedLoad oref - Right () -> return $ Right $ Head hid stored + return $ Left $ Just oref + Right () -> return $ Right new StorageMemory { memHeads = theads, memWatchers = twatch } -> do res <- modifyMVar theads $ \hs -> do ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch return $ case partition ((==(tid, hid)) . fst) hs of ([] , _ ) -> (hs, Left Nothing) - ((_, r):_, hs') | r == storedRef pobj -> (((tid, hid), storedRef stored) : hs', - Right (Head hid stored, ws)) - | otherwise -> (hs, Left $ Just $ Head hid $ wrappedLoad r) + ((_, r):_, hs') | r == prev -> (((tid, hid), new) : hs', + Right (new, ws)) + | otherwise -> (hs, Left $ Just r) case res of - Right (h, ws) -> mapM_ ($ headRef h) ws >> return (Right h) + Right (r, ws) -> mapM_ ($ r) ws >> return (Right r) Left x -> return $ Left x updateHead :: (HeadType a, MonadIO m) => Head a -> (Stored a -> m (Stored a, b)) -> m (Maybe (Head a), b) @@ -507,19 +522,22 @@ watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHe watchHead h = watchHeadWith h id watchHeadWith :: forall a b. (HeadType a, Eq b) => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead -watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do +watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do + watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb + +watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead +watchHeadRaw st tid hid sel cb = do memo <- newEmptyMVar - let tid = headTypeID @a Proxy - addWatcher wl = (wl', WatchedHead st (wlNext wl) memo) + let addWatcher wl = (wl', WatchedHead st (wlNext wl) memo) where wl' = wl { wlNext = wlNext wl + 1 , wlList = WatchListItem { wlID = wlNext wl , wlHead = (tid, hid) , wlFun = \r -> do - let x = sel $ Head hid $ wrappedLoad r + let x = sel r modifyMVar_ memo $ \prev -> do - when (x /= prev) $ cb x - return x + when (Just x /= prev) $ cb x + return $ Just x } : wlList wl } @@ -531,8 +549,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do inotify <- initINotify void $ addWatch inotify [Move] (BC.pack $ headTypePath spath tid) $ \case MovedIn { filePath = fpath } | Just ihid <- HeadID <$> U.fromASCIIBytes fpath -> do - loadHead @a st ihid >>= \case - Just h -> mapM_ ($ headRef h) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar + loadHeadRaw st tid ihid >>= \case + Just ref -> mapM_ ($ ref) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar Nothing -> return () _ -> return () return $ (tid, inotify) : ilist @@ -540,8 +558,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher - cur <- sel . maybe oh id <$> reloadHead oh - cb cur + cur <- fmap sel <$> loadHeadRaw st tid hid + maybe (return ()) cb cur putMVar memo cur return watched diff --git a/test/storage.test b/test/storage.test index 9bf468e..0369807 100644 --- a/test/storage.test +++ b/test/storage.test @@ -154,6 +154,234 @@ test Storage: test StorageWatcher: spawn as p1 + spawn as p2 on p1.node + + send to p1: + "store rec" + "text:n 1" + "" + expect from p1: + /store-done (blake2#[0-9a-f]*)/ capture r1 + + send to p1: + "store rec" + "text:n 2" + "" + expect from p1: + /store-done (blake2#[0-9a-f]*)/ capture r2 + + send to p1: + "store rec" + "text:n 3" + "" + expect from p1: + /store-done (blake2#[0-9a-f]*)/ capture r3 + + send to p1: + "store rec" + "text:n 4" + "" + expect from p1: + /store-done (blake2#[0-9a-f]*)/ capture r4 + + let tid1 = "00000000-0000-0000-0000-000000000001" + let tid2 = "00000000-0000-0000-0000-000000000002" + + # Watch head from multiple processes + + send "head-create $tid1 $r1" to p1 + expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1 + + send "head-watch $tid1 $hid1" to p1 + send "head-watch $tid1 $hid1" to p2 + + expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1 + expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p2 capture w2_1 + + local: + expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r1) + + local: + expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r1) + + send "head-replace $tid1 $hid1 $r2 $r1" to p1 + expect /head-replace-fail $tid1 $hid1 $r2 $r1 $r1/ from p1 + + send "head-replace $tid1 $hid1 $r1 $r2" to p1 + expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1 + + local: + expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r2) + + local: + expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r2) + + # Watch distinct heads and head types + + send "head-create $tid1 $r1" to p1 + expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1_2 + + send "head-create $tid2 $r2" to p1 + expect /head-create-done $tid2 ([0-9a-f-]+)/ from p1 capture hid2 + + send "head-watch $tid1 $hid1_2" to p1 + send "head-watch $tid1 $hid1_2" to p2 + send "head-watch $tid2 $hid2" to p1 + send "head-watch $tid2 $hid2" to p2 + + expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p1 capture w1_1_2 + expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p2 capture w2_1_2 + expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p1 capture w1_2 + expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p2 capture w2_2 + + local: + expect /head-watch-cb $w1_1_2 (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r1) + + local: + expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r1) + + local: + expect /head-watch-cb $w1_2 (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r2) + + local: + expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r2) + + send "head-replace $tid1 $hid1_2 $r1 $r2" to p1 + expect /head-replace-done $tid1 $hid1_2 $r1 $r2/ from p1 + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new + guard (w == w1_1_2) + guard (new == r2) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new + guard (w == w2_1_2) + guard (new == r2) + + send "head-replace $tid2 $hid2 $r2 $r3" to p1 + expect /head-replace-done $tid2 $hid2 $r2 $r3/ from p1 + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new + guard (w == w1_2) + guard (new == r3) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new + guard (w == w2_2) + guard (new == r3) + + # Unwatch head + + send "head-unwatch $w1_1_2" to p1 + expect /head-unwatch-done $w1_1_2/ from p1 + + send "head-unwatch $w1_2" to p1 + expect /head-unwatch-done $w1_2/ from p1 + + send "head-replace $tid2 $hid2 $r3 $r4" to p1 + expect /head-replace-done $tid2 $hid2 $r3 $r4/ from p1 + + send "head-replace $tid1 $hid1_2 $r2 $r1" to p1 + expect /head-replace-done $tid1 $hid1_2 $r2 $r1/ from p1 + + send "head-replace $tid1 $hid1 $r2 $r3" to p1 + expect /head-replace-done $tid1 $hid1 $r2 $r3/ from p1 + + local: + expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r4) + + local: + expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r1) + + local: + expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r3) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new + guard (w == w1_1) + guard (new == r3) + + # Multiple watchers on the same head + + send "head-watch $tid1 $hid1" to p1 + expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1b + + local: + expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r3) + + send "head-watch $tid1 $hid1" to p1 + expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1c + + local: + expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r3) + + send "head-replace $tid1 $hid1 $r3 $r4" to p1 + expect /head-replace-done $tid1 $hid1 $r3 $r4/ from p1 + + local: + expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r4) + + local: + expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r4) + + local: + expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new + guard (new == r4) + + local: + expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new + guard (new == r4) + + for w in [ w1_1, w1_1c ]: + send "head-unwatch $w" to p1 + expect /head-unwatch-done $w/ from p1 + + send "head-replace $tid1 $hid1 $r4 $r1" to p1 + expect /head-replace-done $tid1 $hid1 $r4 $r1/ from p1 + + send "head-replace $tid1 $hid1 $r1 $r2" to p1 + expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1 + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new + guard (w == w2_1) + guard (new == r1) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new + guard (w == w2_1) + guard (new == r2) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new + guard (w == w1_1b) + guard (new == r1) + + local: + expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new + guard (w == w1_1b) + guard (new == r2) + + +test SharedStateWatcher: + spawn as p1 spawn as p2 send "create-identity Device1 Owner" to p1 send "create-identity Device2" to p2 |