diff options
| -rw-r--r-- | erebos.cabal | 1 | ||||
| -rw-r--r-- | main/Test.hs | 47 | ||||
| -rw-r--r-- | main/Test/Service.hs | 13 | ||||
| -rw-r--r-- | src/Erebos/Network.hs | 14 | ||||
| -rw-r--r-- | src/Erebos/Object/Deferred.hs | 57 | ||||
| -rw-r--r-- | test/deferred.et | 50 |
6 files changed, 173 insertions, 9 deletions
diff --git a/erebos.cabal b/erebos.cabal index eca0204..9229768 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -110,6 +110,7 @@ library Erebos.Invite Erebos.Network Erebos.Object + Erebos.Object.Deferred Erebos.Pairing Erebos.PubKey Erebos.Service diff --git a/main/Test.hs b/main/Test.hs index 5b6509a..6896c9a 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -42,6 +42,7 @@ import Erebos.Identity import Erebos.Invite import Erebos.Network import Erebos.Object +import Erebos.Object.Deferred import Erebos.Pairing import Erebos.PubKey import Erebos.Service @@ -70,6 +71,7 @@ data RunningServer = RunningServer { rsServer :: Server , rsPeers :: MVar ( Int, [ TestPeer ] ) , rsPeerThread :: ThreadId + , rsDeferredObjects :: MVar [ Deferred Object ] } data TestPeer = TestPeer @@ -284,6 +286,7 @@ commands = , ( "store-raw", cmdStoreRaw ) , ( "load", cmdLoad ) , ( "load-type", cmdLoadType ) + , ( "load-deferred", cmdLoadDeferred ) , ( "stored-generation", cmdStoredGeneration ) , ( "stored-roots", cmdStoredRoots ) , ( "stored-set-add", cmdStoredSetAdd ) @@ -392,15 +395,36 @@ cmdLoadType :: Command cmdLoadType = do st <- asks tiStorage [ tref ] <- asks tiParams - Just ref <- liftIO $ readRef st $ encodeUtf8 tref - let obj = load @Object ref - let otype = case obj of - Blob {} -> "blob" - Rec {} -> "rec" - OnDemand {} -> "ondemand" - ZeroObject {} -> "zero" - UnknownObject utype _ -> "unknown " <> decodeUtf8 utype - cmdOut $ "load-type " <> T.unpack otype + liftIO (readRef st $ encodeUtf8 tref) >>= \case + Just ref -> do + let obj = load @Object ref + let otype = case obj of + Blob {} -> "blob" + Rec {} -> "rec" + OnDemand {} -> "ondemand" + ZeroObject {} -> "zero" + UnknownObject utype _ -> "unknown " <> decodeUtf8 utype + cmdOut $ "load-type " <> T.unpack otype + Nothing -> do + cmdOut $ "load-type-failed" + +cmdLoadDeferred :: Command +cmdLoadDeferred = do + st <- asks tiStorage + [ tidx ] <- asks tiParams + Just RunningServer {..} <- gets tsServer + deferred <- (!! read (T.unpack tidx)) <$> liftIO (readMVar rsDeferredObjects) + mvar <- deferredLoad deferred + out <- asks tiOutput + liftIO $ void $ forkIO $ readMVar mvar >>= \case + DeferredLoaded sobj -> do + void $ copyRef st $ storedRef sobj + header : _ <- return $ BL.lines $ serializeObject $ fromStored sobj + outLine out $ T.unpack $ T.unwords [ "load-deferred-done", tidx, decodeUtf8 $ BL.toStrict header ] + DeferredInvalid -> do + outLine out $ T.unpack $ T.unwords [ "load-deferred-invalid", tidx ] + DeferredFailed -> do + outLine out $ T.unpack $ T.unwords [ "load-deferred-failed", tidx ] cmdStoredGeneration :: Command cmdStoredGeneration = do @@ -581,6 +605,7 @@ cmdStartServer = do h <- getOrLoadHead rsPeers <- liftIO $ newMVar (1, []) + rsDeferredObjects <- liftIO $ newMVar [] services <- forM serviceNames $ \case ( "attach", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" ( "chatroom", _ ) -> return $ someService @ChatroomService Proxy @@ -609,6 +634,10 @@ cmdStartServer = do StreamClosed seqNum -> do outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] go + , testOnDemandReceived = \size deferred -> do + liftIO $ do + idx <- modifyMVar rsDeferredObjects (\ds -> return ( ds ++ [ deferred ], length ds )) + outLine out $ unwords [ "test-ondemand-received", show idx, show size, show $ deferredRef deferred ] } ( sname, _ ) -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" diff --git a/main/Test/Service.hs b/main/Test/Service.hs index c0be07d..156b62c 100644 --- a/main/Test/Service.hs +++ b/main/Test/Service.hs @@ -9,9 +9,12 @@ import Control.Monad import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as BL +import Data.Word +import Erebos.Identity import Erebos.Network import Erebos.Object +import Erebos.Object.Deferred import Erebos.Service import Erebos.Service.Stream import Erebos.Storable @@ -21,6 +24,7 @@ data TestMessage = TestMessage (Stored Object) data TestMessageAttributes = TestMessageAttributes { testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage () , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage () + , testOnDemandReceived :: Word64 -> Deferred Object -> ServiceHandler TestMessage () } instance Storable TestMessage where @@ -34,6 +38,7 @@ instance Service TestMessage where defaultServiceAttributes _ = TestMessageAttributes { testMessageReceived = \_ _ _ _ -> return () , testStreamsReceived = \_ -> return () + , testOnDemandReceived = \_ _ -> return () } serviceHandler smsg = do @@ -50,6 +55,14 @@ instance Service TestMessage where cb <- asks $ testStreamsReceived . svcAttributes cb streams + case obj of + OnDemand size dgst -> do + cb <- asks $ testOnDemandReceived . svcAttributes + server <- asks svcServer + pid <- asks svcPeerIdentity + cb size =<< liftIO (deferLoadWithServer dgst server [ refDigest $ storedRef $ idData pid ]) + _ -> return () + openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ] openTestStreams count = do diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index b5cfa6b..3a6f259 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -26,6 +26,7 @@ module Erebos.Network ( sendToPeerWith, runPeerService, modifyServiceGlobalState, + requestDataFromPeer, DataRequestResult(..), discoveryPort, ) where @@ -1063,6 +1064,19 @@ modifyServiceGlobalState server proxy f = do throwErebosError $ UnhandledService svc +data DataRequestResult + = DataRequestFulfilled Ref + | DataRequestRejected + | DataRequestInvalid + +requestDataFromPeer :: MonadIO m => Peer -> RefDigest -> (DataRequestResult -> ExceptT ErebosError IO ()) -> m () +requestDataFromPeer peer@Peer {..} dgst callback = do + liftIO $ atomically $ do + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) (callback . DataRequestFulfilled) <$> newTVar (Left []) + putTMVar peerWaitingRefs . (wref :) =<< takeTMVar peerWaitingRefs + writeTQueue (serverDataResponse peerServer_) ( peer, Nothing ) + + foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs new file mode 100644 index 0000000..1faa85b --- /dev/null +++ b/src/Erebos/Object/Deferred.hs @@ -0,0 +1,57 @@ +module Erebos.Object.Deferred ( + Deferred, + DeferredResult(..), + + deferredRef, + deferredLoad, + + deferLoadWithServer, +) where + +import Control.Concurrent.MVar +import Control.Monad.IO.Class + +import Erebos.Identity +import Erebos.Network +import Erebos.Object +import Erebos.Storable + + +data Deferred a = Deferred + { deferredRef_ :: RefDigest + , deferredServer :: Server + , deferredPeers :: [ RefDigest ] + } + +data DeferredResult a + = DeferredLoaded (Stored a) + | DeferredInvalid + | DeferredFailed + +deferredRef :: Deferred a -> RefDigest +deferredRef = deferredRef_ + +deferredLoad :: MonadIO m => Storable a => Deferred a -> m (MVar (DeferredResult a)) +deferredLoad Deferred {..} = liftIO $ do + mvar <- newEmptyMVar + let matchPeer peer = + getPeerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ any (`elem` identityDigests pid) deferredPeers + _ -> return False + + liftIO (findPeer deferredServer matchPeer) >>= \case + Just peer -> do + requestDataFromPeer peer deferredRef_ $ liftIO . \case + DataRequestFulfilled ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref + DataRequestRejected -> putMVar mvar DeferredFailed + DataRequestInvalid -> putMVar mvar DeferredInvalid + Nothing -> putMVar mvar DeferredFailed + return mvar + +deferLoadWithServer :: Storable a => RefDigest -> Server -> [ RefDigest ] -> IO (Deferred a) +deferLoadWithServer deferredRef_ deferredServer deferredPeers = return Deferred {..} + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid diff --git a/test/deferred.et b/test/deferred.et new file mode 100644 index 0000000..c514577 --- /dev/null +++ b/test/deferred.et @@ -0,0 +1,50 @@ +module deferred + +import common + +test OnDemandLoad: + let services = "test" + + spawn as p1 + spawn as p2 + + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server services $services" to p1 + send "start-server services $services" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "test" + send "" + expect /store-done ($refpat)/ capture blob_ref + + send "store ondemand" + send "12" + send "$blob_ref" + send "" + expect /store-done ($refpat)/ capture ondemand_ref + + send "test-message-send 1 $ondemand_ref" + expect /test-message-send done/ + with p2: + expect /test-message-received ondemand [0-9]+ $ondemand_ref/ + expect /test-ondemand-received 0 [0-9]+ $blob_ref/ + + send "load-type $ondemand_ref" + expect /load-type ondemand/ + + send "load-type $blob_ref" + expect /load-type-failed/ + + send "load-deferred 0" + expect /load-deferred-done 0 blob [0-9]+/ + + send "load-type $blob_ref" + expect /load-type blob/ |