diff options
-rw-r--r-- | erebos.cabal | 1 | ||||
-rw-r--r-- | main/Test.hs | 101 | ||||
-rw-r--r-- | main/Test/Service.hs | 21 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 82 | ||||
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 32 | ||||
-rw-r--r-- | src/Erebos/Service.hs | 7 | ||||
-rw-r--r-- | src/Erebos/Service/Stream.hs | 74 | ||||
-rw-r--r-- | test/network.test | 52 |
8 files changed, 315 insertions, 55 deletions
diff --git a/erebos.cabal b/erebos.cabal index 12c2968..f001a24 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -112,6 +112,7 @@ library Erebos.Pairing Erebos.PubKey Erebos.Service + Erebos.Service.Stream Erebos.Set Erebos.State Erebos.Storable diff --git a/main/Test.hs b/main/Test.hs index f2adf22..44818fd 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -44,6 +44,7 @@ import Erebos.Object import Erebos.Pairing import Erebos.PubKey import Erebos.Service +import Erebos.Service.Stream import Erebos.Set import Erebos.State import Erebos.Storable @@ -66,10 +67,17 @@ data TestState = TestState data RunningServer = RunningServer { rsServer :: Server - , rsPeers :: MVar (Int, [(Int, Peer)]) + , rsPeers :: MVar ( Int, [ TestPeer ] ) , rsPeerThread :: ThreadId } +data TestPeer = TestPeer + { tpIndex :: Int + , tpPeer :: Peer + , tpStreamReaders :: MVar [ (Int, StreamReader ) ] + , tpStreamWriters :: MVar [ (Int, StreamWriter ) ] + } + initTestState :: TestState initTestState = TestState { tsHead = Nothing @@ -137,17 +145,20 @@ cmdOut line = do getPeer :: Text -> CommandM Peer -getPeer spidx = do +getPeer spidx = tpPeer <$> getTestPeer spidx + +getTestPeer :: Text -> CommandM TestPeer +getTestPeer spidx = do Just RunningServer {..} <- gets tsServer - Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers) + Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers) return peer -getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int +getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int getPeerIndex pmvar = do peer <- asks svcPeer - maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar) + maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar) -pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a +pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a pairingAttributes _ out peers prefix = PairingAttributes { pairingHookRequest = return () @@ -267,6 +278,9 @@ commands = map (T.pack *** id) , ("peer-drop", cmdPeerDrop) , ("peer-list", cmdPeerList) , ("test-message-send", cmdTestMessageSend) + , ("test-stream-open", cmdTestStreamOpen) + , ("test-stream-close", cmdTestStreamClose) + , ("test-stream-send", cmdTestStreamSend) , ("local-state-get", cmdLocalStateGet) , ("local-state-replace", cmdLocalStateReplace) , ("local-state-wait", cmdLocalStateWait) @@ -501,7 +515,20 @@ cmdStartServer = do { testMessageReceived = \obj otype len sref -> do liftIO $ do void $ store (headStorage h) obj - outLine out $ unwords ["test-message-received", otype, len, sref] + outLine out $ unwords [ "test-message-received", otype, len, sref ] + , testStreamsReceived = \streams -> do + pidx <- getPeerIndex rsPeers + liftIO $ do + nums <- mapM getStreamReaderNumber streams + outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums + forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do + let go = readStreamPacket stream >>= \case + StreamData seqNum bytes -> do + outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ] + go + StreamClosed seqNum -> do + outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] + go } sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" @@ -510,15 +537,23 @@ cmdStartServer = do rsPeerThread <- liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange rsServer - let printPeer (idx, p) = do - params <- peerIdentity p >>= return . \case + let printPeer TestPeer {..} = do + params <- peerIdentity tpPeer >>= return . \case PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) - _ -> [ "addr", show (peerAddress p) ] - outLine out $ unwords $ [ "peer", show idx ] ++ params + _ -> [ "addr", show (peerAddress tpPeer) ] + outLine out $ unwords $ [ "peer", show tpIndex ] ++ params - update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)]) - update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur - | otherwise = fmap (p:) <$> update (nid, ps) + update ( tpIndex, [] ) = do + tpPeer <- return peer + tpStreamReaders <- newMVar [] + tpStreamWriters <- newMVar [] + let tp = TestPeer {..} + printPeer tp + return ( tpIndex + 1, [ tp ] ) + + update cur@( nid, p : ps ) + | tpPeer p == peer = printPeer p >> return cur + | otherwise = fmap (p :) <$> update ( nid, ps ) modifyMVar_ rsPeers update @@ -555,10 +590,10 @@ cmdPeerList = do peers <- liftIO $ getCurrentPeerList rsServer tpeers <- liftIO $ readMVar rsPeers forM_ peers $ \peer -> do - Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers + Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers mbpid <- peerIdentity peer cmdOut $ unwords $ concat - [ [ "peer-list-item", show n ] + [ [ "peer-list-item", show (tpIndex tp) ] , [ "addr", show (peerAddress peer) ] , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) _ -> [] @@ -575,6 +610,40 @@ cmdTestMessageSend = do sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" +cmdTestStreamOpen :: Command +cmdTestStreamOpen = do + spidx : rest <- asks tiParams + tp <- getTestPeer spidx + count <- case rest of + [] -> return 1 + tcount : _ -> return $ read $ T.unpack tcount + + out <- asks tiOutput + runPeerService (tpPeer tp) $ do + streams <- openTestStreams count + afterCommit $ do + nums <- mapM getStreamWriterNumber streams + modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams) + outLine out $ unwords $ "test-stream-open-done" + : T.unpack spidx + : map show nums + +cmdTestStreamClose :: Command +cmdTestStreamClose = do + [ spidx, sid ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ closeStream stream + cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ] + +cmdTestStreamSend :: Command +cmdTestStreamSend = do + [ spidx, sid, content ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ writeStream stream $ encodeUtf8 content + cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ] + cmdLocalStateGet :: Command cmdLocalStateGet = do h <- getHead diff --git a/main/Test/Service.hs b/main/Test/Service.hs index 8c58dee..c0be07d 100644 --- a/main/Test/Service.hs +++ b/main/Test/Service.hs @@ -1,8 +1,11 @@ module Test.Service ( TestMessage(..), TestMessageAttributes(..), + + openTestStreams, ) where +import Control.Monad import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as BL @@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable data TestMessage = TestMessage (Stored Object) data TestMessageAttributes = TestMessageAttributes { testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage () + , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage () } instance Storable TestMessage where @@ -26,7 +31,10 @@ instance Service TestMessage where serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8" type ServiceAttributes TestMessage = TestMessageAttributes - defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ()) + defaultServiceAttributes _ = TestMessageAttributes + { testMessageReceived = \_ _ _ _ -> return () + , testStreamsReceived = \_ -> return () + } serviceHandler smsg = do let TestMessage sobj = fromStored smsg @@ -36,3 +44,14 @@ instance Service TestMessage where cb <- asks $ testMessageReceived . svcAttributes cb obj otype len (show $ refDigest $ storedRef sobj) _ -> return () + + streams <- receivedStreams + when (not $ null streams) $ do + cb <- asks $ testStreamsReceived . svcAttributes + cb streams + + +openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ] +openTestStreams count = do + replyPacket . TestMessage =<< mstore (Rec []) + replicateM count openStream diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 08f4e5c..0baeeb1 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -410,9 +410,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do bracket (open addr) close loop forkServerThread server $ forever $ do - (peer, svc, ref) <- atomically $ readTQueue chanSvc + ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server @@ -527,9 +527,7 @@ openStream = do conn <- readTVarP peerState >>= \case PeerConnected conn -> return conn _ -> throwError "can't open stream without established connection" - (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case - Right res -> return res - Left err -> throwError err + (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) addHeader hdr @@ -549,8 +547,8 @@ appendDistinct x (y:ys) | x == y = y : ys appendDistinct x [] = [x] handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO () + -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ] + -> TransportHeader -> [ PartialRef ] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer @@ -684,10 +682,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} - then do - void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show dgst + then do + streamReaders <- mapM acceptStream $ lookupNewStreams headers + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders ) + else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" @@ -812,7 +811,7 @@ notifyServicesOfPeer :: Peer -> STM () notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () @@ -888,19 +887,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: []) sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) -sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m () sendToPeerList peer parts = do let st = peerStorage peer - srefs <- liftIO $ fmap catMaybes $ forM parts $ \case - ServiceReply (Left x) use -> Just . (,use) <$> store st x - ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) - ServiceFinally act -> act >> return Nothing - let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) - packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] - liftIO $ atomically $ sendToPeerS peer ackedBy packet + res <- runExceptT $ do + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + _ -> return Nothing + + streamHeaders <- concat <$> do + (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do + forM parts $ \case + ServiceOpenStream cb -> do + conn <- lift (readTVar (peerState peer)) >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) + + lift $ writeTQueue (serverIOActions (peerServer peer)) $ do + liftIO $ forkServerThread (peerServer peer) handler + return [ ( hdr, cb writer ) ] + _ -> return [] + liftIO $ sequence_ $ map snd streamHeaders + + liftIO $ forM_ parts $ \case + ServiceFinally act -> act + _ -> return () + + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU + header = TransportHeader $ concat + [ [ ServiceType (serviceID $ head parts) ] + , map ServiceRef dgsts + , map fst streamHeaders + ] + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + + case res of + Right () -> return () + Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $ + "failed to send packet to " <> show (peerAddress peer) <> ": " <> err sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do @@ -940,10 +969,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing +runPeerService = runPeerServiceOn Nothing [] -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -968,6 +997,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd + , svcNewStreams = newStreams } reloadHead (serverOrigHead server) >>= \case Nothing -> atomically $ do diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index f5183b3..025f52c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -23,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -288,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go cGlobalState streamNumber stream + ) where go gs@GlobalState {..} streamNumber stream = do @@ -361,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } + +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -403,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do modifyTVar' cOutStreams $ filter ((snum /=) . fst) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) @@ -425,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index 50bded6..9030d04 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -113,10 +113,13 @@ data ServiceInput s = ServiceInput , svcPeerIdentity :: UnifiedIdentity , svcServer :: Server , svcPrintOp :: String -> IO () + , svcNewStreams :: [ RawStreamReader ] } -data ServiceReply s = ServiceReply (Either s (Stored s)) Bool - | ServiceFinally (IO ()) +data ServiceReply s + = ServiceReply (Either s (Stored s)) Bool + | ServiceOpenStream (RawStreamWriter -> IO ()) + | ServiceFinally (IO ()) data ServiceHandlerState s = ServiceHandlerState { svcValue :: ServiceState s diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs new file mode 100644 index 0000000..67df4d7 --- /dev/null +++ b/src/Erebos/Service/Stream.hs @@ -0,0 +1,74 @@ +module Erebos.Service.Stream ( + StreamPacket(..), + StreamReader, getStreamReaderNumber, + StreamWriter, getStreamWriterNumber, + openStream, receivedStreams, + readStreamPacket, writeStreamPacket, + writeStream, + closeStream, +) where + +import Control.Concurrent.MVar +import Control.Monad.Reader +import Control.Monad.Writer + +import Data.ByteString (ByteString) +import Data.Word + +import Erebos.Flow +import Erebos.Network +import Erebos.Network.Protocol +import Erebos.Service + + +data StreamReader = StreamReader RawStreamReader + +getStreamReaderNumber :: StreamReader -> IO Int +getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream + +data StreamWriter = StreamWriter (MVar StreamWriterData) + +data StreamWriterData = StreamWriterData + { swdStream :: RawStreamWriter + , swdSequence :: Maybe Word64 + } + +getStreamWriterNumber :: StreamWriter -> IO Int +getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream + + +openStream :: Service s => ServiceHandler s StreamWriter +openStream = do + mvar <- liftIO newEmptyMVar + tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ] + return $ StreamWriter mvar + +receivedStreams :: Service s => ServiceHandler s [ StreamReader ] +receivedStreams = do + map StreamReader <$> asks svcNewStreams + +readStreamPacket :: StreamReader -> IO StreamPacket +readStreamPacket (StreamReader stream) = do + readFlowIO (rsrFlow stream) + +writeStreamPacket :: StreamWriter -> StreamPacket -> IO () +writeStreamPacket (StreamWriter mvar) packet = do + withMVar mvar $ \swd -> do + writeFlowIO (rswFlow $ swdStream swd) packet + +writeStream :: StreamWriter -> ByteString -> IO () +writeStream (StreamWriter mvar) bytes = do + modifyMVar_ mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> do + writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes + return swd { swdSequence = Just (seqNum + 1) } + Nothing -> do + fail "writeStream: stream closed" + +closeStream :: StreamWriter -> IO () +closeStream (StreamWriter mvar) = do + withMVar mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum + Nothing -> fail "closeStream: stream already closed" diff --git a/test/network.test b/test/network.test index 52fcbee..0f49a1e 100644 --- a/test/network.test +++ b/test/network.test @@ -182,6 +182,58 @@ test ManyStreams: expect /test-message-received blob 100[2-4] $ref/ from p2 +test ServiceStreams: + let services = "test" + + spawn as p1 + spawn as p2 + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server services $services" to p1 + send "start-server services $services" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + send "test-stream-open 1" to p1 + expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1 + expect /test-stream-open-from 1 $stream1/ from p2 + + send "test-stream-send 1 $stream1 hello" to p1 + expect /test-stream-send-done 1 $stream1/ from p1 + expect /test-stream-received 1 $stream1 0 hello/ from p2 + + send "test-stream-close 1 $stream1" to p1 + expect /test-stream-close-done 1 $stream1/ from p1 + expect /test-stream-closed-from 1 $stream1 1/ from p2 + + send "test-stream-open 1 8" to p2 + expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 + expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1 + + let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ] + with p2: + for i in [ 1..20 ]: + for s in streams2: + send "test-stream-send 1 $s hello$i" + for i in [ 1..20 ]: + for s in streams2: + expect /test-stream-send-done 1 $s/ + for s in streams2: + send "test-stream-close 1 $s" + for s in streams2: + expect /test-stream-close-done 1 $s/ + with p1: + for i in [ 1..20 ]: + for s in streams2: + expect /test-stream-received 1 $s ${i-1} hello$i/ + for s in streams2: + expect /test-stream-closed-from 1 $s 20/ + + test MultipleServiceRefs: let services = "test" |