summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2026-01-25 10:22:04 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2026-01-27 19:45:23 +0100
commit66bfcd8ad4ef16dcd0e287004dc08f8948589bce (patch)
tree337a1658cc4ff76c14254a0d69aafd6c61765a14
parent7e0685f049f8981c4f11c3c83caacf85bc855577 (diff)
Deferred object loading
-rw-r--r--erebos.cabal1
-rw-r--r--main/Test.hs47
-rw-r--r--main/Test/Service.hs13
-rw-r--r--src/Erebos/Network.hs14
-rw-r--r--src/Erebos/Object/Deferred.hs57
-rw-r--r--test/deferred.et50
6 files changed, 173 insertions, 9 deletions
diff --git a/erebos.cabal b/erebos.cabal
index eca0204..9229768 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -110,6 +110,7 @@ library
Erebos.Invite
Erebos.Network
Erebos.Object
+ Erebos.Object.Deferred
Erebos.Pairing
Erebos.PubKey
Erebos.Service
diff --git a/main/Test.hs b/main/Test.hs
index 5b6509a..6896c9a 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -42,6 +42,7 @@ import Erebos.Identity
import Erebos.Invite
import Erebos.Network
import Erebos.Object
+import Erebos.Object.Deferred
import Erebos.Pairing
import Erebos.PubKey
import Erebos.Service
@@ -70,6 +71,7 @@ data RunningServer = RunningServer
{ rsServer :: Server
, rsPeers :: MVar ( Int, [ TestPeer ] )
, rsPeerThread :: ThreadId
+ , rsDeferredObjects :: MVar [ Deferred Object ]
}
data TestPeer = TestPeer
@@ -284,6 +286,7 @@ commands =
, ( "store-raw", cmdStoreRaw )
, ( "load", cmdLoad )
, ( "load-type", cmdLoadType )
+ , ( "load-deferred", cmdLoadDeferred )
, ( "stored-generation", cmdStoredGeneration )
, ( "stored-roots", cmdStoredRoots )
, ( "stored-set-add", cmdStoredSetAdd )
@@ -392,15 +395,36 @@ cmdLoadType :: Command
cmdLoadType = do
st <- asks tiStorage
[ tref ] <- asks tiParams
- Just ref <- liftIO $ readRef st $ encodeUtf8 tref
- let obj = load @Object ref
- let otype = case obj of
- Blob {} -> "blob"
- Rec {} -> "rec"
- OnDemand {} -> "ondemand"
- ZeroObject {} -> "zero"
- UnknownObject utype _ -> "unknown " <> decodeUtf8 utype
- cmdOut $ "load-type " <> T.unpack otype
+ liftIO (readRef st $ encodeUtf8 tref) >>= \case
+ Just ref -> do
+ let obj = load @Object ref
+ let otype = case obj of
+ Blob {} -> "blob"
+ Rec {} -> "rec"
+ OnDemand {} -> "ondemand"
+ ZeroObject {} -> "zero"
+ UnknownObject utype _ -> "unknown " <> decodeUtf8 utype
+ cmdOut $ "load-type " <> T.unpack otype
+ Nothing -> do
+ cmdOut $ "load-type-failed"
+
+cmdLoadDeferred :: Command
+cmdLoadDeferred = do
+ st <- asks tiStorage
+ [ tidx ] <- asks tiParams
+ Just RunningServer {..} <- gets tsServer
+ deferred <- (!! read (T.unpack tidx)) <$> liftIO (readMVar rsDeferredObjects)
+ mvar <- deferredLoad deferred
+ out <- asks tiOutput
+ liftIO $ void $ forkIO $ readMVar mvar >>= \case
+ DeferredLoaded sobj -> do
+ void $ copyRef st $ storedRef sobj
+ header : _ <- return $ BL.lines $ serializeObject $ fromStored sobj
+ outLine out $ T.unpack $ T.unwords [ "load-deferred-done", tidx, decodeUtf8 $ BL.toStrict header ]
+ DeferredInvalid -> do
+ outLine out $ T.unpack $ T.unwords [ "load-deferred-invalid", tidx ]
+ DeferredFailed -> do
+ outLine out $ T.unpack $ T.unwords [ "load-deferred-failed", tidx ]
cmdStoredGeneration :: Command
cmdStoredGeneration = do
@@ -581,6 +605,7 @@ cmdStartServer = do
h <- getOrLoadHead
rsPeers <- liftIO $ newMVar (1, [])
+ rsDeferredObjects <- liftIO $ newMVar []
services <- forM serviceNames $ \case
( "attach", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach"
( "chatroom", _ ) -> return $ someService @ChatroomService Proxy
@@ -609,6 +634,10 @@ cmdStartServer = do
StreamClosed seqNum -> do
outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ]
go
+ , testOnDemandReceived = \size deferred -> do
+ liftIO $ do
+ idx <- modifyMVar rsDeferredObjects (\ds -> return ( ds ++ [ deferred ], length ds ))
+ outLine out $ unwords [ "test-ondemand-received", show idx, show size, show $ deferredRef deferred ]
}
( sname, _ ) -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index c0be07d..156b62c 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -9,9 +9,12 @@ import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as BL
+import Data.Word
+import Erebos.Identity
import Erebos.Network
import Erebos.Object
+import Erebos.Object.Deferred
import Erebos.Service
import Erebos.Service.Stream
import Erebos.Storable
@@ -21,6 +24,7 @@ data TestMessage = TestMessage (Stored Object)
data TestMessageAttributes = TestMessageAttributes
{ testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage ()
, testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage ()
+ , testOnDemandReceived :: Word64 -> Deferred Object -> ServiceHandler TestMessage ()
}
instance Storable TestMessage where
@@ -34,6 +38,7 @@ instance Service TestMessage where
defaultServiceAttributes _ = TestMessageAttributes
{ testMessageReceived = \_ _ _ _ -> return ()
, testStreamsReceived = \_ -> return ()
+ , testOnDemandReceived = \_ _ -> return ()
}
serviceHandler smsg = do
@@ -50,6 +55,14 @@ instance Service TestMessage where
cb <- asks $ testStreamsReceived . svcAttributes
cb streams
+ case obj of
+ OnDemand size dgst -> do
+ cb <- asks $ testOnDemandReceived . svcAttributes
+ server <- asks svcServer
+ pid <- asks svcPeerIdentity
+ cb size =<< liftIO (deferLoadWithServer dgst server [ refDigest $ storedRef $ idData pid ])
+ _ -> return ()
+
openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ]
openTestStreams count = do
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index b5cfa6b..3a6f259 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -26,6 +26,7 @@ module Erebos.Network (
sendToPeerWith,
runPeerService,
modifyServiceGlobalState,
+ requestDataFromPeer, DataRequestResult(..),
discoveryPort,
) where
@@ -1063,6 +1064,19 @@ modifyServiceGlobalState server proxy f = do
throwErebosError $ UnhandledService svc
+data DataRequestResult
+ = DataRequestFulfilled Ref
+ | DataRequestRejected
+ | DataRequestInvalid
+
+requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m ()
+requestDataFromPeer peer@Peer {..} dgst callback = do
+ liftIO $ atomically $ do
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left [])
+ putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs
+ writeTQueue (serverDataResponse peerServer_) ( peer, Nothing )
+
+
foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs
new file mode 100644
index 0000000..1faa85b
--- /dev/null
+++ b/src/Erebos/Object/Deferred.hs
@@ -0,0 +1,57 @@
+module Erebos.Object.Deferred (
+ Deferred,
+ DeferredResult(..),
+
+ deferredRef,
+ deferredLoad,
+
+ deferLoadWithServer,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.IO.Class
+
+import Erebos.Identity
+import Erebos.Network
+import Erebos.Object
+import Erebos.Storable
+
+
+data Deferred a = Deferred
+ { deferredRef_ :: RefDigest
+ , deferredServer :: Server
+ , deferredPeers :: [ RefDigest ]
+ }
+
+data DeferredResult a
+ = DeferredLoaded (Stored a)
+ | DeferredInvalid
+ | DeferredFailed
+
+deferredRef :: Deferred a -> RefDigest
+deferredRef = deferredRef_
+
+deferredLoad :: MonadIO m => Storable a => Deferred a -> m (MVar (DeferredResult a))
+deferredLoad Deferred {..} = liftIO $ do
+ mvar <- newEmptyMVar
+ let matchPeer peer =
+ getPeerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ any (`elem` identityDigests pid) deferredPeers
+ _ -> return False
+
+ liftIO (findPeer deferredServer matchPeer) >>= \case
+ Just peer -> do
+ requestDataFromPeer peer deferredRef_ $ liftIO . \case
+ DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ DataRequestRejected -> putMVar mvar DeferredFailed
+ DataRequestInvalid -> putMVar mvar DeferredInvalid
+ Nothing -> putMVar mvar DeferredFailed
+ return mvar
+
+deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a)
+deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..}
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
diff --git a/test/deferred.et b/test/deferred.et
new file mode 100644
index 0000000..c514577
--- /dev/null
+++ b/test/deferred.et
@@ -0,0 +1,50 @@
+module deferred
+
+import common
+
+test OnDemandLoad:
+ let services = "test"
+
+ spawn as p1
+ spawn as p2
+
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server services $services" to p1
+ send "start-server services $services" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "test"
+ send ""
+ expect /store-done ($refpat)/ capture blob_ref
+
+ send "store ondemand"
+ send "12"
+ send "$blob_ref"
+ send ""
+ expect /store-done ($refpat)/ capture ondemand_ref
+
+ send "test-message-send 1 $ondemand_ref"
+ expect /test-message-send done/
+ with p2:
+ expect /test-message-received ondemand [0-9]+ $ondemand_ref/
+ expect /test-ondemand-received 0 [0-9]+ $blob_ref/
+
+ send "load-type $ondemand_ref"
+ expect /load-type ondemand/
+
+ send "load-type $blob_ref"
+ expect /load-type-failed/
+
+ send "load-deferred 0"
+ expect /load-deferred-done 0 blob [0-9]+/
+
+ send "load-type $blob_ref"
+ expect /load-type blob/