diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2024-06-29 22:17:52 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-06-30 15:42:55 +0200 | 
| commit | fb2f418a6b2b00f5b1f032547bb7e47749a23b80 (patch) | |
| tree | 47852ce8264cfe9ddaca9c372244e46de6a89fa2 | |
| parent | a16efeb30c3d68479e609196e6e1320c89acc6a6 (diff) | |
Storage watching tests with multiple heads and readers
| -rw-r--r-- | erebos.cabal | 1 | ||||
| -rw-r--r-- | main/Test.hs | 61 | ||||
| -rw-r--r-- | src/Erebos/Storage.hs | 74 | ||||
| -rw-r--r-- | test/storage.test | 228 | 
4 files changed, 336 insertions, 28 deletions
| diff --git a/erebos.cabal b/erebos.cabal index 45e6526..8a8f0bc 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -195,3 +195,4 @@ executable erebos          text,          time,          transformers >= 0.5 && <0.7, +        uuid, diff --git a/main/Test.hs b/main/Test.hs index a957f4b..d5737c2 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -23,6 +23,7 @@ import Data.Text qualified as T  import Data.Text.Encoding  import Data.Text.IO qualified as T  import Data.Typeable +import Data.UUID qualified as U  import Network.Socket @@ -51,6 +52,8 @@ import Test.Service  data TestState = TestState      { tsHead :: Maybe (Head LocalState)      , tsServer :: Maybe RunningServer +    , tsWatchedHeads :: [ ( Int, WatchedHead ) ] +    , tsWatchedHeadNext :: Int      , tsWatchedLocalIdentity :: Maybe WatchedHead      , tsWatchedSharedIdentity :: Maybe WatchedHead      } @@ -65,6 +68,8 @@ initTestState :: TestState  initTestState = TestState      { tsHead = Nothing      , tsServer = Nothing +    , tsWatchedHeads = [] +    , tsWatchedHeadNext = 1      , tsWatchedLocalIdentity = Nothing      , tsWatchedSharedIdentity = Nothing      } @@ -243,6 +248,10 @@ commands = map (T.pack *** id)      , ("stored-roots", cmdStoredRoots)      , ("stored-set-add", cmdStoredSetAdd)      , ("stored-set-list", cmdStoredSetList) +    , ("head-create", cmdHeadCreate) +    , ("head-replace", cmdHeadReplace) +    , ("head-watch", cmdHeadWatch) +    , ("head-unwatch", cmdHeadUnwatch)      , ("create-identity", cmdCreateIdentity)      , ("start-server", cmdStartServer)      , ("stop-server", cmdStopServer) @@ -321,6 +330,58 @@ cmdStoredSetList = do          cmdOut $ "stored-set-item" ++ concatMap ((' ':) . show . refDigest . storedRef) item      cmdOut $ "stored-set-done" +cmdHeadCreate :: Command +cmdHeadCreate = do +    [ ttid, tref ] <- asks tiParams +    st <- asks tiStorage +    Just tid <- return $ fromUUID <$> U.fromText ttid +    Just ref <- liftIO $ readRef st (encodeUtf8 tref) + +    h <- storeHeadRaw st tid ref +    cmdOut $ unwords $ [ "head-create-done", show (toUUID tid), show (toUUID h) ] + +cmdHeadReplace :: Command +cmdHeadReplace = do +    [ ttid, thid, told, tnew ] <- asks tiParams +    st <- asks tiStorage +    Just tid <- return $ fmap fromUUID $ U.fromText ttid +    Just hid <- return $ fmap fromUUID $ U.fromText thid +    Just old <- liftIO $ readRef st (encodeUtf8 told) +    Just new <- liftIO $ readRef st (encodeUtf8 tnew) + +    replaceHeadRaw st tid hid old new >>= cmdOut . unwords . \case +        Left Nothing  -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ] +        Left (Just r) -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew, show (refDigest r) ] +        Right _       -> [ "head-replace-done", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ] + +cmdHeadWatch :: Command +cmdHeadWatch = do +    [ ttid, thid ] <- asks tiParams +    st <- asks tiStorage +    Just tid <- return $ fmap fromUUID $ U.fromText ttid +    Just hid <- return $ fmap fromUUID $ U.fromText thid + +    out <- asks tiOutput +    wid <- gets tsWatchedHeadNext + +    watched <- liftIO $ watchHeadRaw st tid hid id $ \r -> do +        outLine out $ unwords [ "head-watch-cb", show wid, show $ refDigest r ] + +    modify $ \s -> s +        { tsWatchedHeads = ( wid, watched ) : tsWatchedHeads s +        , tsWatchedHeadNext = wid + 1 +        } + +    cmdOut $ unwords $ [ "head-watch-done", T.unpack ttid, T.unpack thid, show wid ] + +cmdHeadUnwatch :: Command +cmdHeadUnwatch = do +    [ twid ] <- asks tiParams +    let wid = read (T.unpack twid) +    Just watched <- lookup wid <$> gets tsWatchedHeads +    liftIO $ unwatchHead watched +    cmdOut $ unwords [ "head-unwatch-done", show wid ] +  initTestHead :: Head LocalState -> Command  initTestHead h = do      _ <- liftIO . watchReceivedMessages h . dmReceivedWatcher =<< asks tiOutput diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index 95ef649..6526f40 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -21,9 +21,11 @@ module Erebos.Storage (      headId, headStorage, headRef, headObject, headStoredObject,      loadHeads, loadHead, reloadHead,      storeHead, replaceHead, updateHead, updateHead_, +    loadHeadRaw, storeHeadRaw, replaceHeadRaw,      WatchedHead,      watchHead, watchHeadWith, unwatchHead, +    watchHeadRaw,      MonadStorage(..), @@ -62,7 +64,6 @@ module Erebos.Storage (  ) where  import Control.Applicative -import Control.Arrow  import Control.Concurrent  import Control.Exception  import Control.Monad @@ -72,6 +73,7 @@ import Control.Monad.Writer  import Crypto.Hash +import Data.Bifunctor  import Data.ByteString (ByteString)  import qualified Data.ByteArray as BA  import qualified Data.ByteString as B @@ -436,57 +438,70 @@ loadHeads Storage { stBacking = StorageMemory { memHeads = theads } } = liftIO $      catMaybes . map toHead <$> readMVar theads  loadHead :: forall a m. (HeadType a, MonadIO m) => Storage -> HeadID -> m (Maybe (Head a)) -loadHead s@(Storage { stBacking = StorageDir { dirPath = spath }}) hid = liftIO $ do +loadHead st hid = fmap (Head hid . wrappedLoad) <$> loadHeadRaw st (headTypeID @a Proxy) hid + +loadHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> m (Maybe Ref) +loadHeadRaw s@(Storage { stBacking = StorageDir { dirPath = spath }}) tid hid = liftIO $ do      handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ do -        (h:_) <- BC.lines <$> B.readFile (headPath spath (headTypeID @a Proxy) hid) +        (h:_) <- BC.lines <$> B.readFile (headPath spath tid hid)          Just ref <- readRef s h -        return $ Just $ Head hid $ wrappedLoad ref -loadHead Storage { stBacking = StorageMemory { memHeads = theads } } hid = liftIO $ do -    fmap (Head hid . wrappedLoad) . lookup (headTypeID @a Proxy, hid) <$> readMVar theads +        return $ Just ref +loadHeadRaw Storage { stBacking = StorageMemory { memHeads = theads } } tid hid = liftIO $ do +    lookup (tid, hid) <$> readMVar theads  reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a))  reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid  storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) -storeHead st obj = liftIO $ do +storeHead st obj = do      let tid = headTypeID @a Proxy -    hid <- HeadID <$> U.nextRandom      stored <- wrappedStore st obj +    hid <- storeHeadRaw st tid (storedRef stored) +    return $ Head hid stored + +storeHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> Ref -> m HeadID +storeHeadRaw st tid ref = liftIO $ do +    hid <- HeadID <$> U.nextRandom      case stBacking st of           StorageDir { dirPath = spath } -> do               Right () <- writeFileChecked (headPath spath tid hid) Nothing $ -                 showRef (storedRef stored) `B.append` BC.singleton '\n' +                 showRef ref `B.append` BC.singleton '\n'               return ()           StorageMemory { memHeads = theads } -> do -             modifyMVar_ theads $ return . (((tid, hid), storedRef stored) :) -    return $ Head hid stored +             modifyMVar_ theads $ return . (((tid, hid), ref) :) +    return hid  replaceHead :: forall a m. (HeadType a, MonadIO m) => Head a -> Stored a -> m (Either (Maybe (Head a)) (Head a))  replaceHead prev@(Head hid pobj) stored' = liftIO $ do      let st = headStorage prev          tid = headTypeID @a Proxy      stored <- copyStored st stored' +    bimap (fmap $ Head hid . wrappedLoad) (const $ Head hid stored) <$> +        replaceHeadRaw st tid hid (storedRef pobj) (storedRef stored) + +replaceHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> Ref -> Ref -> m (Either (Maybe Ref) Ref) +replaceHeadRaw st tid hid prev new = liftIO $ do      case stBacking st of           StorageDir { dirPath = spath } -> do               let filename = headPath spath tid hid                   showRefL r = showRef r `B.append` BC.singleton '\n' -             writeFileChecked filename (Just $ showRefL $ headRef prev) (showRefL $ storedRef stored) >>= \case +             writeFileChecked filename (Just $ showRefL prev) (showRefL new) >>= \case                   Left Nothing -> return $ Left Nothing                   Left (Just bs) -> do Just oref <- readRef st $ BC.takeWhile (/='\n') bs -                                      return $ Left $ Just $ Head hid $ wrappedLoad oref -                 Right () -> return $ Right $ Head hid stored +                                      return $ Left $ Just oref +                 Right () -> return $ Right new           StorageMemory { memHeads = theads, memWatchers = twatch } -> do               res <- modifyMVar theads $ \hs -> do                   ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch                   return $ case partition ((==(tid, hid)) . fst) hs of                       ([] , _  ) -> (hs, Left Nothing) -                     ((_, r):_, hs') | r == storedRef pobj -> (((tid, hid), storedRef stored) : hs', -                                                                  Right (Head hid stored, ws)) -                                     | otherwise -> (hs, Left $ Just $ Head hid $ wrappedLoad r) +                     ((_, r):_, hs') | r == prev -> (((tid, hid), new) : hs', +                                                                  Right (new, ws)) +                                     | otherwise -> (hs, Left $ Just r)               case res of -                  Right (h, ws) -> mapM_ ($ headRef h) ws >> return (Right h) +                  Right (r, ws) -> mapM_ ($ r) ws >> return (Right r)                    Left x -> return $ Left x  updateHead :: (HeadType a, MonadIO m) => Head a -> (Stored a -> m (Stored a, b)) -> m (Maybe (Head a), b) @@ -507,19 +522,22 @@ watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHe  watchHead h = watchHeadWith h id  watchHeadWith :: forall a b. (HeadType a, Eq b) => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead -watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do +watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do +    watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb + +watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead +watchHeadRaw st tid hid sel cb = do      memo <- newEmptyMVar -    let tid = headTypeID @a Proxy -        addWatcher wl = (wl', WatchedHead st (wlNext wl) memo) +    let addWatcher wl = (wl', WatchedHead st (wlNext wl) memo)              where wl' = wl { wlNext = wlNext wl + 1                             , wlList = WatchListItem                                 { wlID = wlNext wl                                 , wlHead = (tid, hid)                                 , wlFun = \r -> do -                                   let x = sel $ Head hid $ wrappedLoad r +                                   let x = sel r                                     modifyMVar_ memo $ \prev -> do -                                       when (x /= prev) $ cb x -                                       return x +                                       when (Just x /= prev) $ cb x +                                       return $ Just x                                 } : wlList wl                             } @@ -531,8 +549,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do                       inotify <- initINotify                       void $ addWatch inotify [Move] (BC.pack $ headTypePath spath tid) $ \case                           MovedIn { filePath = fpath } | Just ihid <- HeadID <$> U.fromASCIIBytes fpath -> do -                             loadHead @a st ihid >>= \case -                                 Just h -> mapM_ ($ headRef h) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar +                             loadHeadRaw st tid ihid >>= \case +                                 Just ref -> mapM_ ($ ref) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar                                   Nothing -> return ()                           _ -> return ()                       return $ (tid, inotify) : ilist @@ -540,8 +558,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do           StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher -    cur <- sel . maybe oh id <$> reloadHead oh -    cb cur +    cur <- fmap sel <$> loadHeadRaw st tid hid +    maybe (return ()) cb cur      putMVar memo cur      return watched diff --git a/test/storage.test b/test/storage.test index 9bf468e..0369807 100644 --- a/test/storage.test +++ b/test/storage.test @@ -154,6 +154,234 @@ test Storage:  test StorageWatcher:  	spawn as p1 +	spawn as p2 on p1.node + +	send to p1: +		"store rec" +		"text:n 1" +		"" +	expect from p1: +		/store-done (blake2#[0-9a-f]*)/ capture r1 + +	send to p1: +		"store rec" +		"text:n 2" +		"" +	expect from p1: +		/store-done (blake2#[0-9a-f]*)/ capture r2 + +	send to p1: +		"store rec" +		"text:n 3" +		"" +	expect from p1: +		/store-done (blake2#[0-9a-f]*)/ capture r3 + +	send to p1: +		"store rec" +		"text:n 4" +		"" +	expect from p1: +		/store-done (blake2#[0-9a-f]*)/ capture r4 + +	let tid1 = "00000000-0000-0000-0000-000000000001" +	let tid2 = "00000000-0000-0000-0000-000000000002" + +	# Watch head from multiple processes + +	send "head-create $tid1 $r1" to p1 +	expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1 + +	send "head-watch $tid1 $hid1" to p1 +	send "head-watch $tid1 $hid1" to p2 + +	expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1 +	expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p2 capture w2_1 + +	local: +		expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r1) + +	local: +		expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r1) + +	send "head-replace $tid1 $hid1 $r2 $r1" to p1 +	expect /head-replace-fail $tid1 $hid1 $r2 $r1 $r1/ from p1 + +	send "head-replace $tid1 $hid1 $r1 $r2" to p1 +	expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1 + +	local: +		expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r2) + +	local: +		expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r2) + +	# Watch distinct heads and head types + +	send "head-create $tid1 $r1" to p1 +	expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1_2 + +	send "head-create $tid2 $r2" to p1 +	expect /head-create-done $tid2 ([0-9a-f-]+)/ from p1 capture hid2 + +	send "head-watch $tid1 $hid1_2" to p1 +	send "head-watch $tid1 $hid1_2" to p2 +	send "head-watch $tid2 $hid2" to p1 +	send "head-watch $tid2 $hid2" to p2 + +	expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p1 capture w1_1_2 +	expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p2 capture w2_1_2 +	expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p1 capture w1_2 +	expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p2 capture w2_2 + +	local: +		expect /head-watch-cb $w1_1_2 (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r1) + +	local: +		expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r1) + +	local: +		expect /head-watch-cb $w1_2 (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r2) + +	local: +		expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r2) + +	send "head-replace $tid1 $hid1_2 $r1 $r2" to p1 +	expect /head-replace-done $tid1 $hid1_2 $r1 $r2/ from p1 + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new +		guard (w == w1_1_2) +		guard (new == r2) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new +		guard (w == w2_1_2) +		guard (new == r2) + +	send "head-replace $tid2 $hid2 $r2 $r3" to p1 +	expect /head-replace-done $tid2 $hid2 $r2 $r3/ from p1 + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new +		guard (w == w1_2) +		guard (new == r3) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new +		guard (w == w2_2) +		guard (new == r3) + +	# Unwatch head + +	send "head-unwatch $w1_1_2" to p1 +	expect /head-unwatch-done $w1_1_2/ from p1 + +	send "head-unwatch $w1_2" to p1 +	expect /head-unwatch-done $w1_2/ from p1 + +	send "head-replace $tid2 $hid2 $r3 $r4" to p1 +	expect /head-replace-done $tid2 $hid2 $r3 $r4/ from p1 + +	send "head-replace $tid1 $hid1_2 $r2 $r1" to p1 +	expect /head-replace-done $tid1 $hid1_2 $r2 $r1/ from p1 + +	send "head-replace $tid1 $hid1 $r2 $r3" to p1 +	expect /head-replace-done $tid1 $hid1 $r2 $r3/ from p1 + +	local: +		expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r4) + +	local: +		expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r1) + +	local: +		expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r3) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new +		guard (w == w1_1) +		guard (new == r3) + +	# Multiple watchers on the same head + +	send "head-watch $tid1 $hid1" to p1 +	expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1b + +	local: +		expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r3) + +	send "head-watch $tid1 $hid1" to p1 +	expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1c + +	local: +		expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r3) + +	send "head-replace $tid1 $hid1 $r3 $r4" to p1 +	expect /head-replace-done $tid1 $hid1 $r3 $r4/ from p1 + +	local: +		expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r4) + +	local: +		expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r4) + +	local: +		expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new +		guard (new == r4) + +	local: +		expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new +		guard (new == r4) + +	for w in [ w1_1, w1_1c ]: +		send "head-unwatch $w" to p1 +		expect /head-unwatch-done $w/ from p1 + +	send "head-replace $tid1 $hid1 $r4 $r1" to p1 +	expect /head-replace-done $tid1 $hid1 $r4 $r1/ from p1 + +	send "head-replace $tid1 $hid1 $r1 $r2" to p1 +	expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1 + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new +		guard (w == w2_1) +		guard (new == r1) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new +		guard (w == w2_1) +		guard (new == r2) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new +		guard (w == w1_1b) +		guard (new == r1) + +	local: +		expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new +		guard (w == w1_1b) +		guard (new == r2) + + +test SharedStateWatcher: +	spawn as p1  	spawn as p2  	send "create-identity Device1 Owner" to p1  	send "create-identity Device2" to p2 |