summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/Test/Service.hs2
-rw-r--r--src/Erebos/Network.hs19
-rw-r--r--src/Erebos/Network/Protocol.hs11
-rw-r--r--src/Erebos/Object.hs38
-rw-r--r--src/Erebos/Object/Deferred.hs29
-rw-r--r--test/deferred.et148
6 files changed, 228 insertions, 19 deletions
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index 156b62c..c952c86 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -60,7 +60,7 @@ instance Service TestMessage where
cb <- asks $ testOnDemandReceived . svcAttributes
server <- asks svcServer
pid <- asks svcPeerIdentity
- cb size =<< liftIO (deferLoadWithServer dgst server [ refDigest $ storedRef $ idData pid ])
+ cb size =<< liftIO (deferLoadWithServer dgst (DeferredExactSize size) server [ refDigest $ storedRef $ idData pid ])
_ -> return ()
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 3a6f259..56af0bb 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -233,7 +233,9 @@ lookupNewStreams [] = []
newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
newWaitingRef dgst act = do
peer@Peer {..} <- gets phPeer
- wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left []))
+ let cb (DataRequestFulfilled ref) = act ref
+ cb _ = return ()
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) maxBound cb <$> liftSTM (newTVar (Left []))
modifyTMVarP peerWaitingRefs (wref:)
liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
return wref
@@ -451,8 +453,8 @@ dataResponseWorker server = forever $ do
Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of
[] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
Right ref -> do
- atomically (writeTVar tvar $ Right ref)
- forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
+ atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref)
+ forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err)
Right () -> return ()
@@ -1064,15 +1066,10 @@ modifyServiceGlobalState server proxy f = do
throwErebosError $ UnhandledService svc
-data DataRequestResult
- = DataRequestFulfilled Ref
- | DataRequestRejected
- | DataRequestInvalid
-
-requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m ()
-requestDataFromPeer peer@Peer {..} dgst callback = do
+requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> Word64 -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m ()
+requestDataFromPeer peer@Peer {..} dgst bound callback = do
liftIO $ atomically $ do
- wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left [])
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) bound callback <$> newTVar (Left [])
putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs
writeTQueue (serverDataResponse peerServer_) ( peer, Nothing )
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 463bf40..34f1163 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -7,6 +7,7 @@ module Erebos.Network.Protocol (
SecurityRequirement(..),
WaitingRef(..),
+ DataRequestResult(..),
WaitingRefCallback,
wrDigest,
@@ -464,10 +465,16 @@ writeByteStringToStream stream = go 0
data WaitingRef = WaitingRef
{ wrefStorage :: Storage
, wrefPartial :: PartialRef
- , wrefAction :: Ref -> WaitingRefCallback
- , wrefStatus :: TVar (Either [RefDigest] Ref)
+ , wrefBound :: Word64
+ , wrefAction :: DataRequestResult -> WaitingRefCallback
+ , wrefStatus :: TVar (Either [ RefDigest ] DataRequestResult)
}
+data DataRequestResult
+ = DataRequestFulfilled Ref
+ | DataRequestRejected
+ | DataRequestBrokenBound
+
type WaitingRefCallback = ExceptT ErebosError IO ()
wrDigest :: WaitingRef -> RefDigest
diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs
index f00b63d..955b0d3 100644
--- a/src/Erebos/Object.hs
+++ b/src/Erebos/Object.hs
@@ -18,6 +18,44 @@ module Erebos.Object (
readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
+
+ componentSize,
+ partialComponentSize,
) where
+import Data.ByteString.Lazy qualified as BL
+import Data.Maybe
+import Data.Set qualified as S
+import Data.Word
+
import Erebos.Object.Internal
+
+
+componentSize :: Ref -> Word64
+componentSize ref = go S.empty [ ref ]
+ where
+ go seen (r : rs)
+ | refDigest r `S.member` seen = go seen rs
+ | otherwise = objectSize r + go (S.insert (refDigest r) seen) (referredFrom r ++ rs)
+ go _ [] = 0
+
+ objectSize = fromIntegral . BL.length . lazyLoadBytes
+ referredFrom r = case load r of
+ Rec items -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items
+ _ -> []
+
+partialComponentSize :: PartialRef -> IO Word64
+partialComponentSize ref = go S.empty [ ref ]
+ where
+ go seen (r : rs)
+ | refDigest r `S.member` seen = go seen rs
+ | otherwise = do
+ size <- objectSize r
+ referred <- referredFrom r
+ (size +) <$> go (S.insert (refDigest r) seen) (referred ++ rs)
+ go _ [] = return 0
+
+ objectSize r = either (const 0) (fromIntegral . BL.length) <$> ioLoadBytes r
+ referredFrom r = ioLoadObject r >>= return . \case
+ Right (Rec items) -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items
+ _ -> []
diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs
index 1faa85b..396428d 100644
--- a/src/Erebos/Object/Deferred.hs
+++ b/src/Erebos/Object/Deferred.hs
@@ -1,5 +1,6 @@
module Erebos.Object.Deferred (
Deferred,
+ DeferredSize(..),
DeferredResult(..),
deferredRef,
@@ -11,6 +12,8 @@ module Erebos.Object.Deferred (
import Control.Concurrent.MVar
import Control.Monad.IO.Class
+import Data.Word
+
import Erebos.Identity
import Erebos.Network
import Erebos.Object
@@ -19,10 +22,15 @@ import Erebos.Storable
data Deferred a = Deferred
{ deferredRef_ :: RefDigest
+ , deferredSize :: DeferredSize
, deferredServer :: Server
, deferredPeers :: [ RefDigest ]
}
+data DeferredSize
+ = DeferredExactSize Word64
+ | DeferredMaximumSize Word64
+
data DeferredResult a
= DeferredLoaded (Stored a)
| DeferredInvalid
@@ -42,15 +50,26 @@ deferredLoad Deferred {..} = liftIO $ do
liftIO (findPeer deferredServer matchPeer) >>= \case
Just peer -> do
- requestDataFromPeer peer deferredRef_ $ liftIO . \case
- DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ let bound = case deferredSize of
+ DeferredExactSize s -> s
+ DeferredMaximumSize s -> s
+
+ checkSize ref = case deferredSize of
+ DeferredExactSize s -> componentSize ref == s
+ DeferredMaximumSize s -> componentSize ref <= s
+
+ requestDataFromPeer peer deferredRef_ bound $ liftIO . \case
+ DataRequestFulfilled ref
+ | checkSize ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ | otherwise -> putMVar mvar DeferredInvalid
DataRequestRejected -> putMVar mvar DeferredFailed
- DataRequestInvalid -> putMVar mvar DeferredInvalid
+ DataRequestBrokenBound -> putMVar mvar DeferredInvalid
+
Nothing -> putMVar mvar DeferredFailed
return mvar
-deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a)
-deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..}
+deferLoadWithServer :: Storable a => RefDigest -> DeferredSize -> Server -> [ RefDigest ] -> IO (Deferred a)
+deferLoadWithServer deferredRef_ deferredSize deferredServer deferredPeers = return Deferred {..}
identityDigests :: Foldable f => Identity f -> [ RefDigest ]
diff --git a/test/deferred.et b/test/deferred.et
index c514577..35585cd 100644
--- a/test/deferred.et
+++ b/test/deferred.et
@@ -48,3 +48,151 @@ test OnDemandLoad:
send "load-type $blob_ref"
expect /load-type blob/
+
+test OnDemandValidity:
+ let services = "test"
+
+ spawn as p1
+ spawn as p2
+
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server services $services" to p1
+ send "start-server services $services" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "test"
+ send ""
+ expect /store-done ($refpat)/ capture ref
+
+ let size = 12
+
+ send "store ondemand"
+ send "${size - 1}"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_small
+
+ send "store ondemand"
+ send "$size"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_correct
+
+ send "store ondemand"
+ send "${size + 1}"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_big
+
+ send "test-message-send 1 $ondemand_ref_small"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_small/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-invalid $idx/
+
+ send "test-message-send 1 $ondemand_ref_correct"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_correct/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-done $idx blob [0-9]+/
+
+ send "test-message-send 1 $ondemand_ref_big"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_big/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-invalid $idx/
+
+ with p1:
+ send "store rec"
+ send "r:t root1"
+ send ""
+ expect /store-done ($refpat)/ capture rec_root_1
+
+ send "store rec"
+ send "r:t root2"
+ send ""
+ expect /store-done ($refpat)/ capture rec_root_2
+
+ send "store rec"
+ send "r:r $rec_root_1"
+ send ""
+ expect /store-done ($refpat)/ capture rec_merge_1
+
+ send "store rec"
+ send "r:r $rec_root_1"
+ send "r:r $rec_root_2"
+ send "r:t merge2"
+ send ""
+ expect /store-done ($refpat)/ capture rec_merge_2
+
+ send "store rec"
+ send "r:r $rec_root_1"
+ send "r:r $rec_root_2"
+ send "r:t merge3"
+ send ""
+ expect /store-done ($refpat)/ capture rec_merge_3
+
+ send "store rec"
+ send "r:r $rec_merge_1"
+ send "r:r $rec_merge_2"
+ send "r:r $rec_merge_3"
+ send ""
+ expect /store-done ($refpat)/ capture ref
+
+ let size = 695
+
+ send "store ondemand"
+ send "${size - 1}"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_small
+
+ send "store ondemand"
+ send "$size"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_correct
+
+ send "store ondemand"
+ send "${size + 1}"
+ send "$ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref_big
+
+ send "test-message-send 1 $ondemand_ref_small"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_small/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-invalid $idx/
+
+ send "test-message-send 1 $ondemand_ref_correct"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_correct/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-done $idx [a-z]+ [0-9]+/
+
+ send "test-message-send 1 $ondemand_ref_big"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref_big/
+ expect /test-ondemand-received ([0-9]+) [0-9]+ $ref/ capture idx
+ send "load-deferred $idx"
+ expect /load-deferred-invalid $idx/