From 0a78dd7f3e56c4879771a60bb3b43b197ddb444d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 28 Jan 2026 20:01:31 +0100 Subject: Check component size when loading ondemand object --- src/Erebos/Network.hs | 19 ++++++++----------- src/Erebos/Network/Protocol.hs | 11 +++++++++-- src/Erebos/Object.hs | 38 ++++++++++++++++++++++++++++++++++++++ src/Erebos/Object/Deferred.hs | 29 ++++++++++++++++++++++++----- 4 files changed, 79 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 3a6f259..56af0bb 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -233,7 +233,9 @@ lookupNewStreams [] = [] newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef newWaitingRef dgst act = do peer@Peer {..} <- gets phPeer - wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) + let cb (DataRequestFulfilled ref) = act ref + cb _ = return () + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) maxBound cb <$> liftSTM (newTVar (Left [])) modifyTMVarP peerWaitingRefs (wref:) liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) return wref @@ -451,8 +453,8 @@ dataResponseWorker server = forever $ do Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case Right ref -> do - atomically (writeTVar tvar $ Right ref) - forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case + atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref) + forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () @@ -1064,15 +1066,10 @@ modifyServiceGlobalState server proxy f = do throwErebosError $ UnhandledService svc -data DataRequestResult - = DataRequestFulfilled Ref - | DataRequestRejected - | DataRequestInvalid - -requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m () -requestDataFromPeer peer@Peer {..} dgst callback = do +requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> Word64 -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m () +requestDataFromPeer peer@Peer {..} dgst bound callback = do liftIO $ atomically $ do - wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left []) + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) bound callback <$> newTVar (Left []) putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs writeTQueue (serverDataResponse peerServer_) ( peer, Nothing ) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 463bf40..34f1163 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -7,6 +7,7 @@ module Erebos.Network.Protocol ( SecurityRequirement(..), WaitingRef(..), + DataRequestResult(..), WaitingRefCallback, wrDigest, @@ -464,10 +465,16 @@ writeByteStringToStream stream = go 0 data WaitingRef = WaitingRef { wrefStorage :: Storage , wrefPartial :: PartialRef - , wrefAction :: Ref -> WaitingRefCallback - , wrefStatus :: TVar (Either [RefDigest] Ref) + , wrefBound :: Word64 + , wrefAction :: DataRequestResult -> WaitingRefCallback + , wrefStatus :: TVar (Either [ RefDigest ] DataRequestResult) } +data DataRequestResult + = DataRequestFulfilled Ref + | DataRequestRejected + | DataRequestBrokenBound + type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs index f00b63d..955b0d3 100644 --- a/src/Erebos/Object.hs +++ b/src/Erebos/Object.hs @@ -18,6 +18,44 @@ module Erebos.Object ( readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, + + componentSize, + partialComponentSize, ) where +import Data.ByteString.Lazy qualified as BL +import Data.Maybe +import Data.Set qualified as S +import Data.Word + import Erebos.Object.Internal + + +componentSize :: Ref -> Word64 +componentSize ref = go S.empty [ ref ] + where + go seen (r : rs) + | refDigest r `S.member` seen = go seen rs + | otherwise = objectSize r + go (S.insert (refDigest r) seen) (referredFrom r ++ rs) + go _ [] = 0 + + objectSize = fromIntegral . BL.length . lazyLoadBytes + referredFrom r = case load r of + Rec items -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items + _ -> [] + +partialComponentSize :: PartialRef -> IO Word64 +partialComponentSize ref = go S.empty [ ref ] + where + go seen (r : rs) + | refDigest r `S.member` seen = go seen rs + | otherwise = do + size <- objectSize r + referred <- referredFrom r + (size +) <$> go (S.insert (refDigest r) seen) (referred ++ rs) + go _ [] = return 0 + + objectSize r = either (const 0) (fromIntegral . BL.length) <$> ioLoadBytes r + referredFrom r = ioLoadObject r >>= return . \case + Right (Rec items) -> mapMaybe ((\case RecRef r' -> Just r'; _ -> Nothing) . snd) items + _ -> [] diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs index 1faa85b..396428d 100644 --- a/src/Erebos/Object/Deferred.hs +++ b/src/Erebos/Object/Deferred.hs @@ -1,5 +1,6 @@ module Erebos.Object.Deferred ( Deferred, + DeferredSize(..), DeferredResult(..), deferredRef, @@ -11,6 +12,8 @@ module Erebos.Object.Deferred ( import Control.Concurrent.MVar import Control.Monad.IO.Class +import Data.Word + import Erebos.Identity import Erebos.Network import Erebos.Object @@ -19,10 +22,15 @@ import Erebos.Storable data Deferred a = Deferred { deferredRef_ :: RefDigest + , deferredSize :: DeferredSize , deferredServer :: Server , deferredPeers :: [ RefDigest ] } +data DeferredSize + = DeferredExactSize Word64 + | DeferredMaximumSize Word64 + data DeferredResult a = DeferredLoaded (Stored a) | DeferredInvalid @@ -42,15 +50,26 @@ deferredLoad Deferred {..} = liftIO $ do liftIO (findPeer deferredServer matchPeer) >>= \case Just peer -> do - requestDataFromPeer peer deferredRef_ $ liftIO . \case - DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref + let bound = case deferredSize of + DeferredExactSize s -> s + DeferredMaximumSize s -> s + + checkSize ref = case deferredSize of + DeferredExactSize s -> componentSize ref == s + DeferredMaximumSize s -> componentSize ref <= s + + requestDataFromPeer peer deferredRef_ bound $ liftIO . \case + DataRequestFulfilled ref + | checkSize ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref + | otherwise -> putMVar mvar DeferredInvalid DataRequestRejected -> putMVar mvar DeferredFailed - DataRequestInvalid -> putMVar mvar DeferredInvalid + DataRequestBrokenBound -> putMVar mvar DeferredInvalid + Nothing -> putMVar mvar DeferredFailed return mvar -deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a) -deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..} +deferLoadWithServer :: Storable a => RefDigest -> DeferredSize -> Server -> [ RefDigest ] -> IO (Deferred a) +deferLoadWithServer deferredRef_ deferredSize deferredServer deferredPeers = return Deferred {..} identityDigests :: Foldable f => Identity f -> [ RefDigest ] -- cgit v1.2.3