summaryrefslogtreecommitdiff
path: root/src/Erebos
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos')
-rw-r--r--src/Erebos/Network.hs19
-rw-r--r--src/Erebos/Network/Protocol.hs11
-rw-r--r--src/Erebos/Object.hs38
-rw-r--r--src/Erebos/Object/Deferred.hs29
4 files changed, 79 insertions, 18 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 3a6f259..56af0bb 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -233,7 +233,9 @@ lookupNewStreams [] = []
newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
newWaitingRef dgst act = do
peer@Peer {..} <- gets phPeer
- wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left []))
+ let cb (DataRequestFulfilled ref) = act ref
+ cb _ = return ()
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) maxBound cb <$> liftSTM (newTVar (Left []))
modifyTMVarP peerWaitingRefs (wref:)
liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
return wref
@@ -451,8 +453,8 @@ dataResponseWorker server = forever $ do
Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of
[] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
Right ref -> do
- atomically (writeTVar tvar $ Right ref)
- forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
+ atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref)
+ forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err)
Right () -> return ()
@@ -1064,15 +1066,10 @@ modifyServiceGlobalState server proxy f = do
throwErebosError $ UnhandledService svc
-data DataRequestResult
- = DataRequestFulfilled Ref
- | DataRequestRejected
- | DataRequestInvalid
-
-requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m ()
-requestDataFromPeer peer@Peer {..} dgst callback = do
+requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> Word64 -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m ()
+requestDataFromPeer peer@Peer {..} dgst bound callback = do
liftIO $ atomically $ do
- wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left [])
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) bound callback <$> newTVar (Left [])
putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs
writeTQueue (serverDataResponse peerServer_) ( peer, Nothing )
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 463bf40..34f1163 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -7,6 +7,7 @@ module Erebos.Network.Protocol (
SecurityRequirement(..),
WaitingRef(..),
+ DataRequestResult(..),
WaitingRefCallback,
wrDigest,
@@ -464,10 +465,16 @@ writeByteStringToStream stream = go 0
data WaitingRef = WaitingRef
{ wrefStorage :: Storage
, wrefPartial :: PartialRef
- , wrefAction :: Ref -> WaitingRefCallback
- , wrefStatus :: TVar (Either [RefDigest] Ref)
+ , wrefBound :: Word64
+ , wrefAction :: DataRequestResult -> WaitingRefCallback
+ , wrefStatus :: TVar (Either [ RefDigest ] DataRequestResult)
}
+data DataRequestResult
+ = DataRequestFulfilled Ref
+ | DataRequestRejected
+ | DataRequestBrokenBound
+
type WaitingRefCallback = ExceptT ErebosError IO ()
wrDigest :: WaitingRef -> RefDigest
diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs
index f00b63d..955b0d3 100644
--- a/src/Erebos/Object.hs
+++ b/src/Erebos/Object.hs
@@ -18,6 +18,44 @@ module Erebos.Object (
readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
+
+ componentSize,
+ partialComponentSize,
) where
+import Data.ByteString.Lazy qualified as BL
+import Data.Maybe
+import Data.Set qualified as S
+import Data.Word
+
import Erebos.Object.Internal
+
+
+componentSize :: Ref -> Word64
+componentSize ref = go S.empty [ ref ]
+ where
+ go seen (r : rs)
+ | refDigest r `S.member` seen = go seen rs
+ | otherwise = objectSize r + go (S.insert (refDigest r) seen) (referredFrom r ++ rs)
+ go _ [] = 0
+
+ objectSize = fromIntegral . BL.length . lazyLoadBytes
+ referredFrom r = case load r of
+ Rec items -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items
+ _ -> []
+
+partialComponentSize :: PartialRef -> IO Word64
+partialComponentSize ref = go S.empty [ ref ]
+ where
+ go seen (r : rs)
+ | refDigest r `S.member` seen = go seen rs
+ | otherwise = do
+ size <- objectSize r
+ referred <- referredFrom r
+ (size +) <$> go (S.insert (refDigest r) seen) (referred ++ rs)
+ go _ [] = return 0
+
+ objectSize r = either (const 0) (fromIntegral . BL.length) <$> ioLoadBytes r
+ referredFrom r = ioLoadObject r >>= return . \case
+ Right (Rec items) -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items
+ _ -> []
diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs
index 1faa85b..396428d 100644
--- a/src/Erebos/Object/Deferred.hs
+++ b/src/Erebos/Object/Deferred.hs
@@ -1,5 +1,6 @@
module Erebos.Object.Deferred (
Deferred,
+ DeferredSize(..),
DeferredResult(..),
deferredRef,
@@ -11,6 +12,8 @@ module Erebos.Object.Deferred (
import Control.Concurrent.MVar
import Control.Monad.IO.Class
+import Data.Word
+
import Erebos.Identity
import Erebos.Network
import Erebos.Object
@@ -19,10 +22,15 @@ import Erebos.Storable
data Deferred a = Deferred
{ deferredRef_ :: RefDigest
+ , deferredSize :: DeferredSize
, deferredServer :: Server
, deferredPeers :: [ RefDigest ]
}
+data DeferredSize
+ = DeferredExactSize Word64
+ | DeferredMaximumSize Word64
+
data DeferredResult a
= DeferredLoaded (Stored a)
| DeferredInvalid
@@ -42,15 +50,26 @@ deferredLoad Deferred {..} = liftIO $ do
liftIO (findPeer deferredServer matchPeer) >>= \case
Just peer -> do
- requestDataFromPeer peer deferredRef_ $ liftIO . \case
- DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ let bound = case deferredSize of
+ DeferredExactSize s -> s
+ DeferredMaximumSize s -> s
+
+ checkSize ref = case deferredSize of
+ DeferredExactSize s -> componentSize ref == s
+ DeferredMaximumSize s -> componentSize ref <= s
+
+ requestDataFromPeer peer deferredRef_ bound $ liftIO . \case
+ DataRequestFulfilled ref
+ | checkSize ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ | otherwise -> putMVar mvar DeferredInvalid
DataRequestRejected -> putMVar mvar DeferredFailed
- DataRequestInvalid -> putMVar mvar DeferredInvalid
+ DataRequestBrokenBound -> putMVar mvar DeferredInvalid
+
Nothing -> putMVar mvar DeferredFailed
return mvar
-deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a)
-deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..}
+deferLoadWithServer :: Storable a => RefDigest -> DeferredSize -> Server -> [ RefDigest ] -> IO (Deferred a)
+deferLoadWithServer deferredRef_ deferredSize deferredServer deferredPeers = return Deferred {..}
identityDigests :: Foldable f => Identity f -> [ RefDigest ]