diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2026-01-28 20:01:31 +0100 |
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2026-01-28 22:39:02 +0100 |
| commit | 0a78dd7f3e56c4879771a60bb3b43b197ddb444d (patch) | |
| tree | 54b583569e37ff323d0e6c8b7f9a642d1fa4b395 | |
| parent | 66bfcd8ad4ef16dcd0e287004dc08f8948589bce (diff) | |
| -rw-r--r-- | main/Test/Service.hs | 2 | ||||
| -rw-r--r-- | src/Erebos/Network.hs | 19 | ||||
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 11 | ||||
| -rw-r--r-- | src/Erebos/Object.hs | 38 | ||||
| -rw-r--r-- | src/Erebos/Object/Deferred.hs | 29 | ||||
| -rw-r--r-- | test/deferred.et | 148 |
6 files changed, 228 insertions, 19 deletions
diff --git a/main/Test/Service.hs b/main/Test/Service.hs index 156b62c..c952c86 100644 --- a/main/Test/Service.hs +++ b/main/Test/Service.hs @@ -60,7 +60,7 @@ instance Service TestMessage where cb <- asks $ testOnDemandReceived . svcAttributes server <- asks svcServer pid <- asks svcPeerIdentity - cb size =<< liftIO (deferLoadWithServer dgst server [ refDigest $ storedRef $ idData pid ]) + cb size =<< liftIO (deferLoadWithServer dgst (DeferredExactSize size) server [ refDigest $ storedRef $ idData pid ]) _ -> return () diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 3a6f259..56af0bb 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -233,7 +233,9 @@ lookupNewStreams [] = [] newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef newWaitingRef dgst act = do peer@Peer {..} <- gets phPeer - wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) + let cb (DataRequestFulfilled ref) = act ref + cb _ = return () + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) maxBound cb <$> liftSTM (newTVar (Left [])) modifyTMVarP peerWaitingRefs (wref:) liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) return wref @@ -451,8 +453,8 @@ dataResponseWorker server = forever $ do Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case Right ref -> do - atomically (writeTVar tvar $ Right ref) - forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case + atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref) + forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () @@ -1064,15 +1066,10 @@ modifyServiceGlobalState server proxy f = do throwErebosError $ UnhandledService svc -data DataRequestResult - = DataRequestFulfilled Ref - | DataRequestRejected - | DataRequestInvalid - -requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m () -requestDataFromPeer peer@Peer {..} dgst callback = do +requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> Word64 -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m () +requestDataFromPeer peer@Peer {..} dgst bound callback = do liftIO $ atomically $ do - wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left []) + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) bound callback <$> newTVar (Left []) putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs writeTQueue (serverDataResponse peerServer_) ( peer, Nothing ) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 463bf40..34f1163 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -7,6 +7,7 @@ module Erebos.Network.Protocol ( SecurityRequirement(..), WaitingRef(..), + DataRequestResult(..), WaitingRefCallback, wrDigest, @@ -464,10 +465,16 @@ writeByteStringToStream stream = go 0 data WaitingRef = WaitingRef { wrefStorage :: Storage , wrefPartial :: PartialRef - , wrefAction :: Ref -> WaitingRefCallback - , wrefStatus :: TVar (Either [RefDigest] Ref) + , wrefBound :: Word64 + , wrefAction :: DataRequestResult -> WaitingRefCallback + , wrefStatus :: TVar (Either [ RefDigest ] DataRequestResult) } +data DataRequestResult + = DataRequestFulfilled Ref + | DataRequestRejected + | DataRequestBrokenBound + type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs index f00b63d..955b0d3 100644 --- a/src/Erebos/Object.hs +++ b/src/Erebos/Object.hs @@ -18,6 +18,44 @@ module Erebos.Object ( readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, + + componentSize, + partialComponentSize, ) where +import Data.ByteString.Lazy qualified as BL +import Data.Maybe +import Data.Set qualified as S +import Data.Word + import Erebos.Object.Internal + + +componentSize :: Ref -> Word64 +componentSize ref = go S.empty [ ref ] + where + go seen (r : rs) + | refDigest r `S.member` seen = go seen rs + | otherwise = objectSize r + go (S.insert (refDigest r) seen) (referredFrom r ++ rs) + go _ [] = 0 + + objectSize = fromIntegral . BL.length . lazyLoadBytes + referredFrom r = case load r of + Rec items -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items + _ -> [] + +partialComponentSize :: PartialRef -> IO Word64 +partialComponentSize ref = go S.empty [ ref ] + where + go seen (r : rs) + | refDigest r `S.member` seen = go seen rs + | otherwise = do + size <- objectSize r + referred <- referredFrom r + (size +) <$> go (S.insert (refDigest r) seen) (referred ++ rs) + go _ [] = return 0 + + objectSize r = either (const 0) (fromIntegral . BL.length) <$> ioLoadBytes r + referredFrom r = ioLoadObject r >>= return . \case + Right (Rec items) -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items + _ -> [] diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs index 1faa85b..396428d 100644 --- a/src/Erebos/Object/Deferred.hs +++ b/src/Erebos/Object/Deferred.hs @@ -1,5 +1,6 @@ module Erebos.Object.Deferred ( Deferred, + DeferredSize(..), DeferredResult(..), deferredRef, @@ -11,6 +12,8 @@ module Erebos.Object.Deferred ( import Control.Concurrent.MVar import Control.Monad.IO.Class +import Data.Word + import Erebos.Identity import Erebos.Network import Erebos.Object @@ -19,10 +22,15 @@ import Erebos.Storable data Deferred a = Deferred { deferredRef_ :: RefDigest + , deferredSize :: DeferredSize , deferredServer :: Server , deferredPeers :: [ RefDigest ] } +data DeferredSize + = DeferredExactSize Word64 + | DeferredMaximumSize Word64 + data DeferredResult a = DeferredLoaded (Stored a) | DeferredInvalid @@ -42,15 +50,26 @@ deferredLoad Deferred {..} = liftIO $ do liftIO (findPeer deferredServer matchPeer) >>= \case Just peer -> do - requestDataFromPeer peer deferredRef_ $ liftIO . \case - DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref + let bound = case deferredSize of + DeferredExactSize s -> s + DeferredMaximumSize s -> s + + checkSize ref = case deferredSize of + DeferredExactSize s -> componentSize ref == s + DeferredMaximumSize s -> componentSize ref <= s + + requestDataFromPeer peer deferredRef_ bound $ liftIO . \case + DataRequestFulfilled ref + | checkSize ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref + | otherwise -> putMVar mvar DeferredInvalid DataRequestRejected -> putMVar mvar DeferredFailed - DataRequestInvalid -> putMVar mvar DeferredInvalid + DataRequestBrokenBound -> putMVar mvar DeferredInvalid + Nothing -> putMVar mvar DeferredFailed return mvar -deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a) -deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..} +deferLoadWithServer :: Storable a => RefDigest -> DeferredSize -> Server -> [ RefDigest ] -> IO (Deferred a) +deferLoadWithServer deferredRef_ deferredSize deferredServer deferredPeers = return Deferred {..} identityDigests :: Foldable f => Identity f -> [ RefDigest ] diff --git a/test/deferred.et b/test/deferred.et index c514577..35585cd 100644 --- a/test/deferred.et +++ b/test/deferred.et @@ -48,3 +48,151 @@ test OnDemandLoad: send "load-type $blob_ref" expect /load-type blob/ + +test OnDemandValidity: + let services = "test" + + spawn as p1 + spawn as p2 + + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server services $services" to p1 + send "start-server services $services" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "test" + send "" + expect /store-done ($refpat)/ capture ref + + let size = 12 + + send "store ondemand" + send "${size - 1}" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_small + + send "store ondemand" + send "$size" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_correct + + send "store ondemand" + send "${size + 1}" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_big + + send "test-message-send 1 $ondemand_ref_small" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_small/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-invalid $idx/ + + send "test-message-send 1 $ondemand_ref_correct" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_correct/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-done $idx blob [0-9]+/ + + send "test-message-send 1 $ondemand_ref_big" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_big/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-invalid $idx/ + + with p1: + send "store rec" + send "r:t root1" + send "" + expect /store-done ($refpat)/ capture rec_root_1 + + send "store rec" + send "r:t root2" + send "" + expect /store-done ($refpat)/ capture rec_root_2 + + send "store rec" + send "r:r $rec_root_1" + send "" + expect /store-done ($refpat)/ capture rec_merge_1 + + send "store rec" + send "r:r $rec_root_1" + send "r:r $rec_root_2" + send "r:t merge2" + send "" + expect /store-done ($refpat)/ capture rec_merge_2 + + send "store rec" + send "r:r $rec_root_1" + send "r:r $rec_root_2" + send "r:t merge3" + send "" + expect /store-done ($refpat)/ capture rec_merge_3 + + send "store rec" + send "r:r $rec_merge_1" + send "r:r $rec_merge_2" + send "r:r $rec_merge_3" + send "" + expect /store-done ($refpat)/ capture ref + + let size = 695 + + send "store ondemand" + send "${size - 1}" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_small + + send "store ondemand" + send "$size" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_correct + + send "store ondemand" + send "${size + 1}" + send "$ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref_big + + send "test-message-send 1 $ondemand_ref_small" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_small/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-invalid $idx/ + + send "test-message-send 1 $ondemand_ref_correct" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_correct/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-done $idx [a-z]+ [0-9]+/ + + send "test-message-send 1 $ondemand_ref_big" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref_big/ + expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx + send "load-deferred $idx" + expect /load-deferred-invalid $idx/ |