summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--erebos.cabal1
-rw-r--r--main/Test.hs61
-rw-r--r--src/Erebos/Storage.hs74
-rw-r--r--test/storage.test228
4 files changed, 336 insertions, 28 deletions
diff --git a/erebos.cabal b/erebos.cabal
index 45e6526..8a8f0bc 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -195,3 +195,4 @@ executable erebos
text,
time,
transformers >= 0.5 && <0.7,
+ uuid,
diff --git a/main/Test.hs b/main/Test.hs
index a957f4b..d5737c2 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -23,6 +23,7 @@ import Data.Text qualified as T
import Data.Text.Encoding
import Data.Text.IO qualified as T
import Data.Typeable
+import Data.UUID qualified as U
import Network.Socket
@@ -51,6 +52,8 @@ import Test.Service
data TestState = TestState
{ tsHead :: Maybe (Head LocalState)
, tsServer :: Maybe RunningServer
+ , tsWatchedHeads :: [ ( Int, WatchedHead ) ]
+ , tsWatchedHeadNext :: Int
, tsWatchedLocalIdentity :: Maybe WatchedHead
, tsWatchedSharedIdentity :: Maybe WatchedHead
}
@@ -65,6 +68,8 @@ initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
, tsServer = Nothing
+ , tsWatchedHeads = []
+ , tsWatchedHeadNext = 1
, tsWatchedLocalIdentity = Nothing
, tsWatchedSharedIdentity = Nothing
}
@@ -243,6 +248,10 @@ commands = map (T.pack *** id)
, ("stored-roots", cmdStoredRoots)
, ("stored-set-add", cmdStoredSetAdd)
, ("stored-set-list", cmdStoredSetList)
+ , ("head-create", cmdHeadCreate)
+ , ("head-replace", cmdHeadReplace)
+ , ("head-watch", cmdHeadWatch)
+ , ("head-unwatch", cmdHeadUnwatch)
, ("create-identity", cmdCreateIdentity)
, ("start-server", cmdStartServer)
, ("stop-server", cmdStopServer)
@@ -321,6 +330,58 @@ cmdStoredSetList = do
cmdOut $ "stored-set-item" ++ concatMap ((' ':) . show . refDigest . storedRef) item
cmdOut $ "stored-set-done"
+cmdHeadCreate :: Command
+cmdHeadCreate = do
+ [ ttid, tref ] <- asks tiParams
+ st <- asks tiStorage
+ Just tid <- return $ fromUUID <$> U.fromText ttid
+ Just ref <- liftIO $ readRef st (encodeUtf8 tref)
+
+ h <- storeHeadRaw st tid ref
+ cmdOut $ unwords $ [ "head-create-done", show (toUUID tid), show (toUUID h) ]
+
+cmdHeadReplace :: Command
+cmdHeadReplace = do
+ [ ttid, thid, told, tnew ] <- asks tiParams
+ st <- asks tiStorage
+ Just tid <- return $ fmap fromUUID $ U.fromText ttid
+ Just hid <- return $ fmap fromUUID $ U.fromText thid
+ Just old <- liftIO $ readRef st (encodeUtf8 told)
+ Just new <- liftIO $ readRef st (encodeUtf8 tnew)
+
+ replaceHeadRaw st tid hid old new >>= cmdOut . unwords . \case
+ Left Nothing -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ]
+ Left (Just r) -> [ "head-replace-fail", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew, show (refDigest r) ]
+ Right _ -> [ "head-replace-done", T.unpack ttid, T.unpack thid, T.unpack told, T.unpack tnew ]
+
+cmdHeadWatch :: Command
+cmdHeadWatch = do
+ [ ttid, thid ] <- asks tiParams
+ st <- asks tiStorage
+ Just tid <- return $ fmap fromUUID $ U.fromText ttid
+ Just hid <- return $ fmap fromUUID $ U.fromText thid
+
+ out <- asks tiOutput
+ wid <- gets tsWatchedHeadNext
+
+ watched <- liftIO $ watchHeadRaw st tid hid id $ \r -> do
+ outLine out $ unwords [ "head-watch-cb", show wid, show $ refDigest r ]
+
+ modify $ \s -> s
+ { tsWatchedHeads = ( wid, watched ) : tsWatchedHeads s
+ , tsWatchedHeadNext = wid + 1
+ }
+
+ cmdOut $ unwords $ [ "head-watch-done", T.unpack ttid, T.unpack thid, show wid ]
+
+cmdHeadUnwatch :: Command
+cmdHeadUnwatch = do
+ [ twid ] <- asks tiParams
+ let wid = read (T.unpack twid)
+ Just watched <- lookup wid <$> gets tsWatchedHeads
+ liftIO $ unwatchHead watched
+ cmdOut $ unwords [ "head-unwatch-done", show wid ]
+
initTestHead :: Head LocalState -> Command
initTestHead h = do
_ <- liftIO . watchReceivedMessages h . dmReceivedWatcher =<< asks tiOutput
diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs
index 95ef649..6526f40 100644
--- a/src/Erebos/Storage.hs
+++ b/src/Erebos/Storage.hs
@@ -21,9 +21,11 @@ module Erebos.Storage (
headId, headStorage, headRef, headObject, headStoredObject,
loadHeads, loadHead, reloadHead,
storeHead, replaceHead, updateHead, updateHead_,
+ loadHeadRaw, storeHeadRaw, replaceHeadRaw,
WatchedHead,
watchHead, watchHeadWith, unwatchHead,
+ watchHeadRaw,
MonadStorage(..),
@@ -62,7 +64,6 @@ module Erebos.Storage (
) where
import Control.Applicative
-import Control.Arrow
import Control.Concurrent
import Control.Exception
import Control.Monad
@@ -72,6 +73,7 @@ import Control.Monad.Writer
import Crypto.Hash
+import Data.Bifunctor
import Data.ByteString (ByteString)
import qualified Data.ByteArray as BA
import qualified Data.ByteString as B
@@ -436,57 +438,70 @@ loadHeads Storage { stBacking = StorageMemory { memHeads = theads } } = liftIO $
catMaybes . map toHead <$> readMVar theads
loadHead :: forall a m. (HeadType a, MonadIO m) => Storage -> HeadID -> m (Maybe (Head a))
-loadHead s@(Storage { stBacking = StorageDir { dirPath = spath }}) hid = liftIO $ do
+loadHead st hid = fmap (Head hid . wrappedLoad) <$> loadHeadRaw st (headTypeID @a Proxy) hid
+
+loadHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> m (Maybe Ref)
+loadHeadRaw s@(Storage { stBacking = StorageDir { dirPath = spath }}) tid hid = liftIO $ do
handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ do
- (h:_) <- BC.lines <$> B.readFile (headPath spath (headTypeID @a Proxy) hid)
+ (h:_) <- BC.lines <$> B.readFile (headPath spath tid hid)
Just ref <- readRef s h
- return $ Just $ Head hid $ wrappedLoad ref
-loadHead Storage { stBacking = StorageMemory { memHeads = theads } } hid = liftIO $ do
- fmap (Head hid . wrappedLoad) . lookup (headTypeID @a Proxy, hid) <$> readMVar theads
+ return $ Just ref
+loadHeadRaw Storage { stBacking = StorageMemory { memHeads = theads } } tid hid = liftIO $ do
+ lookup (tid, hid) <$> readMVar theads
reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a))
reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid
storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a)
-storeHead st obj = liftIO $ do
+storeHead st obj = do
let tid = headTypeID @a Proxy
- hid <- HeadID <$> U.nextRandom
stored <- wrappedStore st obj
+ hid <- storeHeadRaw st tid (storedRef stored)
+ return $ Head hid stored
+
+storeHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> Ref -> m HeadID
+storeHeadRaw st tid ref = liftIO $ do
+ hid <- HeadID <$> U.nextRandom
case stBacking st of
StorageDir { dirPath = spath } -> do
Right () <- writeFileChecked (headPath spath tid hid) Nothing $
- showRef (storedRef stored) `B.append` BC.singleton '\n'
+ showRef ref `B.append` BC.singleton '\n'
return ()
StorageMemory { memHeads = theads } -> do
- modifyMVar_ theads $ return . (((tid, hid), storedRef stored) :)
- return $ Head hid stored
+ modifyMVar_ theads $ return . (((tid, hid), ref) :)
+ return hid
replaceHead :: forall a m. (HeadType a, MonadIO m) => Head a -> Stored a -> m (Either (Maybe (Head a)) (Head a))
replaceHead prev@(Head hid pobj) stored' = liftIO $ do
let st = headStorage prev
tid = headTypeID @a Proxy
stored <- copyStored st stored'
+ bimap (fmap $ Head hid . wrappedLoad) (const $ Head hid stored) <$>
+ replaceHeadRaw st tid hid (storedRef pobj) (storedRef stored)
+
+replaceHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> Ref -> Ref -> m (Either (Maybe Ref) Ref)
+replaceHeadRaw st tid hid prev new = liftIO $ do
case stBacking st of
StorageDir { dirPath = spath } -> do
let filename = headPath spath tid hid
showRefL r = showRef r `B.append` BC.singleton '\n'
- writeFileChecked filename (Just $ showRefL $ headRef prev) (showRefL $ storedRef stored) >>= \case
+ writeFileChecked filename (Just $ showRefL prev) (showRefL new) >>= \case
Left Nothing -> return $ Left Nothing
Left (Just bs) -> do Just oref <- readRef st $ BC.takeWhile (/='\n') bs
- return $ Left $ Just $ Head hid $ wrappedLoad oref
- Right () -> return $ Right $ Head hid stored
+ return $ Left $ Just oref
+ Right () -> return $ Right new
StorageMemory { memHeads = theads, memWatchers = twatch } -> do
res <- modifyMVar theads $ \hs -> do
ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch
return $ case partition ((==(tid, hid)) . fst) hs of
([] , _ ) -> (hs, Left Nothing)
- ((_, r):_, hs') | r == storedRef pobj -> (((tid, hid), storedRef stored) : hs',
- Right (Head hid stored, ws))
- | otherwise -> (hs, Left $ Just $ Head hid $ wrappedLoad r)
+ ((_, r):_, hs') | r == prev -> (((tid, hid), new) : hs',
+ Right (new, ws))
+ | otherwise -> (hs, Left $ Just r)
case res of
- Right (h, ws) -> mapM_ ($ headRef h) ws >> return (Right h)
+ Right (r, ws) -> mapM_ ($ r) ws >> return (Right r)
Left x -> return $ Left x
updateHead :: (HeadType a, MonadIO m) => Head a -> (Stored a -> m (Stored a, b)) -> m (Maybe (Head a), b)
@@ -507,19 +522,22 @@ watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHe
watchHead h = watchHeadWith h id
watchHeadWith :: forall a b. (HeadType a, Eq b) => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead
-watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do
+watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do
+ watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb
+
+watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead
+watchHeadRaw st tid hid sel cb = do
memo <- newEmptyMVar
- let tid = headTypeID @a Proxy
- addWatcher wl = (wl', WatchedHead st (wlNext wl) memo)
+ let addWatcher wl = (wl', WatchedHead st (wlNext wl) memo)
where wl' = wl { wlNext = wlNext wl + 1
, wlList = WatchListItem
{ wlID = wlNext wl
, wlHead = (tid, hid)
, wlFun = \r -> do
- let x = sel $ Head hid $ wrappedLoad r
+ let x = sel r
modifyMVar_ memo $ \prev -> do
- when (x /= prev) $ cb x
- return x
+ when (Just x /= prev) $ cb x
+ return $ Just x
} : wlList wl
}
@@ -531,8 +549,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do
inotify <- initINotify
void $ addWatch inotify [Move] (BC.pack $ headTypePath spath tid) $ \case
MovedIn { filePath = fpath } | Just ihid <- HeadID <$> U.fromASCIIBytes fpath -> do
- loadHead @a st ihid >>= \case
- Just h -> mapM_ ($ headRef h) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar
+ loadHeadRaw st tid ihid >>= \case
+ Just ref -> mapM_ ($ ref) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList . snd =<< readMVar mvar
Nothing -> return ()
_ -> return ()
return $ (tid, inotify) : ilist
@@ -540,8 +558,8 @@ watchHeadWith oh@(Head hid (Stored (Ref st _) _)) sel cb = do
StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher
- cur <- sel . maybe oh id <$> reloadHead oh
- cb cur
+ cur <- fmap sel <$> loadHeadRaw st tid hid
+ maybe (return ()) cb cur
putMVar memo cur
return watched
diff --git a/test/storage.test b/test/storage.test
index 9bf468e..0369807 100644
--- a/test/storage.test
+++ b/test/storage.test
@@ -154,6 +154,234 @@ test Storage:
test StorageWatcher:
spawn as p1
+ spawn as p2 on p1.node
+
+ send to p1:
+ "store rec"
+ "text:n 1"
+ ""
+ expect from p1:
+ /store-done (blake2#[0-9a-f]*)/ capture r1
+
+ send to p1:
+ "store rec"
+ "text:n 2"
+ ""
+ expect from p1:
+ /store-done (blake2#[0-9a-f]*)/ capture r2
+
+ send to p1:
+ "store rec"
+ "text:n 3"
+ ""
+ expect from p1:
+ /store-done (blake2#[0-9a-f]*)/ capture r3
+
+ send to p1:
+ "store rec"
+ "text:n 4"
+ ""
+ expect from p1:
+ /store-done (blake2#[0-9a-f]*)/ capture r4
+
+ let tid1 = "00000000-0000-0000-0000-000000000001"
+ let tid2 = "00000000-0000-0000-0000-000000000002"
+
+ # Watch head from multiple processes
+
+ send "head-create $tid1 $r1" to p1
+ expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1
+
+ send "head-watch $tid1 $hid1" to p1
+ send "head-watch $tid1 $hid1" to p2
+
+ expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1
+ expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p2 capture w2_1
+
+ local:
+ expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r1)
+
+ send "head-replace $tid1 $hid1 $r2 $r1" to p1
+ expect /head-replace-fail $tid1 $hid1 $r2 $r1 $r1/ from p1
+
+ send "head-replace $tid1 $hid1 $r1 $r2" to p1
+ expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1
+
+ local:
+ expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r2)
+
+ local:
+ expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r2)
+
+ # Watch distinct heads and head types
+
+ send "head-create $tid1 $r1" to p1
+ expect /head-create-done $tid1 ([0-9a-f-]+)/ from p1 capture hid1_2
+
+ send "head-create $tid2 $r2" to p1
+ expect /head-create-done $tid2 ([0-9a-f-]+)/ from p1 capture hid2
+
+ send "head-watch $tid1 $hid1_2" to p1
+ send "head-watch $tid1 $hid1_2" to p2
+ send "head-watch $tid2 $hid2" to p1
+ send "head-watch $tid2 $hid2" to p2
+
+ expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p1 capture w1_1_2
+ expect /head-watch-done $tid1 $hid1_2 ([0-9]+)/ from p2 capture w2_1_2
+ expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p1 capture w1_2
+ expect /head-watch-done $tid2 $hid2 ([0-9]+)/ from p2 capture w2_2
+
+ local:
+ expect /head-watch-cb $w1_1_2 (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb $w1_2 (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r2)
+
+ local:
+ expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r2)
+
+ send "head-replace $tid1 $hid1_2 $r1 $r2" to p1
+ expect /head-replace-done $tid1 $hid1_2 $r1 $r2/ from p1
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new
+ guard (w == w1_1_2)
+ guard (new == r2)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new
+ guard (w == w2_1_2)
+ guard (new == r2)
+
+ send "head-replace $tid2 $hid2 $r2 $r3" to p1
+ expect /head-replace-done $tid2 $hid2 $r2 $r3/ from p1
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new
+ guard (w == w1_2)
+ guard (new == r3)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new
+ guard (w == w2_2)
+ guard (new == r3)
+
+ # Unwatch head
+
+ send "head-unwatch $w1_1_2" to p1
+ expect /head-unwatch-done $w1_1_2/ from p1
+
+ send "head-unwatch $w1_2" to p1
+ expect /head-unwatch-done $w1_2/ from p1
+
+ send "head-replace $tid2 $hid2 $r3 $r4" to p1
+ expect /head-replace-done $tid2 $hid2 $r3 $r4/ from p1
+
+ send "head-replace $tid1 $hid1_2 $r2 $r1" to p1
+ expect /head-replace-done $tid1 $hid1_2 $r2 $r1/ from p1
+
+ send "head-replace $tid1 $hid1 $r2 $r3" to p1
+ expect /head-replace-done $tid1 $hid1 $r2 $r3/ from p1
+
+ local:
+ expect /head-watch-cb $w2_2 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r4)
+
+ local:
+ expect /head-watch-cb $w2_1_2 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r3)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new
+ guard (w == w1_1)
+ guard (new == r3)
+
+ # Multiple watchers on the same head
+
+ send "head-watch $tid1 $hid1" to p1
+ expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1b
+
+ local:
+ expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r3)
+
+ send "head-watch $tid1 $hid1" to p1
+ expect /head-watch-done $tid1 $hid1 ([0-9]+)/ from p1 capture w1_1c
+
+ local:
+ expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r3)
+
+ send "head-replace $tid1 $hid1 $r3 $r4" to p1
+ expect /head-replace-done $tid1 $hid1 $r3 $r4/ from p1
+
+ local:
+ expect /head-watch-cb $w1_1 (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r4)
+
+ local:
+ expect /head-watch-cb $w1_1b (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r4)
+
+ local:
+ expect /head-watch-cb $w1_1c (blake2#[0-9a-f]+)/ from p1 capture new
+ guard (new == r4)
+
+ local:
+ expect /head-watch-cb $w2_1 (blake2#[0-9a-f]+)/ from p2 capture new
+ guard (new == r4)
+
+ for w in [ w1_1, w1_1c ]:
+ send "head-unwatch $w" to p1
+ expect /head-unwatch-done $w/ from p1
+
+ send "head-replace $tid1 $hid1 $r4 $r1" to p1
+ expect /head-replace-done $tid1 $hid1 $r4 $r1/ from p1
+
+ send "head-replace $tid1 $hid1 $r1 $r2" to p1
+ expect /head-replace-done $tid1 $hid1 $r1 $r2/ from p1
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new
+ guard (w == w2_1)
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p2 capture w, new
+ guard (w == w2_1)
+ guard (new == r2)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new
+ guard (w == w1_1b)
+ guard (new == r1)
+
+ local:
+ expect /head-watch-cb ([0-9]+) (blake2#[0-9a-f]+)/ from p1 capture w, new
+ guard (w == w1_1b)
+ guard (new == r2)
+
+
+test SharedStateWatcher:
+ spawn as p1
spawn as p2
send "create-identity Device1 Owner" to p1
send "create-identity Device2" to p2