summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--erebos.cabal1
-rw-r--r--main/Test.hs101
-rw-r--r--main/Test/Service.hs21
-rw-r--r--src/Erebos/Network.hs82
-rw-r--r--src/Erebos/Network/Protocol.hs32
-rw-r--r--src/Erebos/Service.hs7
-rw-r--r--src/Erebos/Service/Stream.hs74
-rw-r--r--test/network.test52
8 files changed, 315 insertions, 55 deletions
diff --git a/erebos.cabal b/erebos.cabal
index 12c2968..f001a24 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -112,6 +112,7 @@ library
Erebos.Pairing
Erebos.PubKey
Erebos.Service
+ Erebos.Service.Stream
Erebos.Set
Erebos.State
Erebos.Storable
diff --git a/main/Test.hs b/main/Test.hs
index f2adf22..44818fd 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -44,6 +44,7 @@ import Erebos.Object
import Erebos.Pairing
import Erebos.PubKey
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Set
import Erebos.State
import Erebos.Storable
@@ -66,10 +67,17 @@ data TestState = TestState
data RunningServer = RunningServer
{ rsServer :: Server
- , rsPeers :: MVar (Int, [(Int, Peer)])
+ , rsPeers :: MVar ( Int, [ TestPeer ] )
, rsPeerThread :: ThreadId
}
+data TestPeer = TestPeer
+ { tpIndex :: Int
+ , tpPeer :: Peer
+ , tpStreamReaders :: MVar [ (Int, StreamReader ) ]
+ , tpStreamWriters :: MVar [ (Int, StreamWriter ) ]
+ }
+
initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
@@ -137,17 +145,20 @@ cmdOut line = do
getPeer :: Text -> CommandM Peer
-getPeer spidx = do
+getPeer spidx = tpPeer <$> getTestPeer spidx
+
+getTestPeer :: Text -> CommandM TestPeer
+getTestPeer spidx = do
Just RunningServer {..} <- gets tsServer
- Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)
+ Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers)
return peer
-getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int
+getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int
getPeerIndex pmvar = do
peer <- asks svcPeer
- maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar)
+ maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar)
-pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a
+pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a
pairingAttributes _ out peers prefix = PairingAttributes
{ pairingHookRequest = return ()
@@ -267,6 +278,9 @@ commands = map (T.pack *** id)
, ("peer-drop", cmdPeerDrop)
, ("peer-list", cmdPeerList)
, ("test-message-send", cmdTestMessageSend)
+ , ("test-stream-open", cmdTestStreamOpen)
+ , ("test-stream-close", cmdTestStreamClose)
+ , ("test-stream-send", cmdTestStreamSend)
, ("local-state-get", cmdLocalStateGet)
, ("local-state-replace", cmdLocalStateReplace)
, ("local-state-wait", cmdLocalStateWait)
@@ -501,7 +515,20 @@ cmdStartServer = do
{ testMessageReceived = \obj otype len sref -> do
liftIO $ do
void $ store (headStorage h) obj
- outLine out $ unwords ["test-message-received", otype, len, sref]
+ outLine out $ unwords [ "test-message-received", otype, len, sref ]
+ , testStreamsReceived = \streams -> do
+ pidx <- getPeerIndex rsPeers
+ liftIO $ do
+ nums <- mapM getStreamReaderNumber streams
+ outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums
+ forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do
+ let go = readStreamPacket stream >>= \case
+ StreamData seqNum bytes -> do
+ outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ]
+ go
+ StreamClosed seqNum -> do
+ outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ]
+ go
}
sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
@@ -510,15 +537,23 @@ cmdStartServer = do
rsPeerThread <- liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange rsServer
- let printPeer (idx, p) = do
- params <- peerIdentity p >>= return . \case
+ let printPeer TestPeer {..} = do
+ params <- peerIdentity tpPeer >>= return . \case
PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
- _ -> [ "addr", show (peerAddress p) ]
- outLine out $ unwords $ [ "peer", show idx ] ++ params
+ _ -> [ "addr", show (peerAddress tpPeer) ]
+ outLine out $ unwords $ [ "peer", show tpIndex ] ++ params
- update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)])
- update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur
- | otherwise = fmap (p:) <$> update (nid, ps)
+ update ( tpIndex, [] ) = do
+ tpPeer <- return peer
+ tpStreamReaders <- newMVar []
+ tpStreamWriters <- newMVar []
+ let tp = TestPeer {..}
+ printPeer tp
+ return ( tpIndex + 1, [ tp ] )
+
+ update cur@( nid, p : ps )
+ | tpPeer p == peer = printPeer p >> return cur
+ | otherwise = fmap (p :) <$> update ( nid, ps )
modifyMVar_ rsPeers update
@@ -555,10 +590,10 @@ cmdPeerList = do
peers <- liftIO $ getCurrentPeerList rsServer
tpeers <- liftIO $ readMVar rsPeers
forM_ peers $ \peer -> do
- Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers
+ Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers
mbpid <- peerIdentity peer
cmdOut $ unwords $ concat
- [ [ "peer-list-item", show n ]
+ [ [ "peer-list-item", show (tpIndex tp) ]
, [ "addr", show (peerAddress peer) ]
, case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
_ -> []
@@ -575,6 +610,40 @@ cmdTestMessageSend = do
sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
+cmdTestStreamOpen :: Command
+cmdTestStreamOpen = do
+ spidx : rest <- asks tiParams
+ tp <- getTestPeer spidx
+ count <- case rest of
+ [] -> return 1
+ tcount : _ -> return $ read $ T.unpack tcount
+
+ out <- asks tiOutput
+ runPeerService (tpPeer tp) $ do
+ streams <- openTestStreams count
+ afterCommit $ do
+ nums <- mapM getStreamWriterNumber streams
+ modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams)
+ outLine out $ unwords $ "test-stream-open-done"
+ : T.unpack spidx
+ : map show nums
+
+cmdTestStreamClose :: Command
+cmdTestStreamClose = do
+ [ spidx, sid ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ closeStream stream
+ cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ]
+
+cmdTestStreamSend :: Command
+cmdTestStreamSend = do
+ [ spidx, sid, content ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ writeStream stream $ encodeUtf8 content
+ cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ]
+
cmdLocalStateGet :: Command
cmdLocalStateGet = do
h <- getHead
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index 8c58dee..c0be07d 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -1,8 +1,11 @@
module Test.Service (
TestMessage(..),
TestMessageAttributes(..),
+
+ openTestStreams,
) where
+import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as BL
@@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
data TestMessage = TestMessage (Stored Object)
data TestMessageAttributes = TestMessageAttributes
{ testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage ()
+ , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage ()
}
instance Storable TestMessage where
@@ -26,7 +31,10 @@ instance Service TestMessage where
serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8"
type ServiceAttributes TestMessage = TestMessageAttributes
- defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ())
+ defaultServiceAttributes _ = TestMessageAttributes
+ { testMessageReceived = \_ _ _ _ -> return ()
+ , testStreamsReceived = \_ -> return ()
+ }
serviceHandler smsg = do
let TestMessage sobj = fromStored smsg
@@ -36,3 +44,14 @@ instance Service TestMessage where
cb <- asks $ testMessageReceived . svcAttributes
cb obj otype len (show $ refDigest $ storedRef sobj)
_ -> return ()
+
+ streams <- receivedStreams
+ when (not $ null streams) $ do
+ cb <- asks $ testStreamsReceived . svcAttributes
+ cb streams
+
+
+openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ]
+openTestStreams count = do
+ replyPacket . TestMessage =<< mstore (Rec [])
+ replicateM count openStream
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 08f4e5c..0baeeb1 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -410,9 +410,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
@@ -527,9 +527,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -549,8 +547,8 @@ appendDistinct x (y:ys) | x == y = y : ys
appendDistinct x [] = [x]
handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
+ -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
@@ -684,10 +682,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -812,7 +811,7 @@ notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer
receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
@@ -888,19 +887,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -940,10 +969,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService = runPeerServiceOn Nothing []
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -968,6 +997,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index f5183b3..025f52c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -23,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -288,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go cGlobalState streamNumber stream
+ )
where
go gs@GlobalState {..} streamNumber stream = do
@@ -361,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
+
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -403,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do
modifyTVar' cOutStreams $ filter ((snum /=) . fst)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
@@ -425,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index 50bded6..9030d04 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -113,10 +113,13 @@ data ServiceInput s = ServiceInput
, svcPeerIdentity :: UnifiedIdentity
, svcServer :: Server
, svcPrintOp :: String -> IO ()
+ , svcNewStreams :: [ RawStreamReader ]
}
-data ServiceReply s = ServiceReply (Either s (Stored s)) Bool
- | ServiceFinally (IO ())
+data ServiceReply s
+ = ServiceReply (Either s (Stored s)) Bool
+ | ServiceOpenStream (RawStreamWriter -> IO ())
+ | ServiceFinally (IO ())
data ServiceHandlerState s = ServiceHandlerState
{ svcValue :: ServiceState s
diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs
new file mode 100644
index 0000000..67df4d7
--- /dev/null
+++ b/src/Erebos/Service/Stream.hs
@@ -0,0 +1,74 @@
+module Erebos.Service.Stream (
+ StreamPacket(..),
+ StreamReader, getStreamReaderNumber,
+ StreamWriter, getStreamWriterNumber,
+ openStream, receivedStreams,
+ readStreamPacket, writeStreamPacket,
+ writeStream,
+ closeStream,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.Reader
+import Control.Monad.Writer
+
+import Data.ByteString (ByteString)
+import Data.Word
+
+import Erebos.Flow
+import Erebos.Network
+import Erebos.Network.Protocol
+import Erebos.Service
+
+
+data StreamReader = StreamReader RawStreamReader
+
+getStreamReaderNumber :: StreamReader -> IO Int
+getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
+
+data StreamWriter = StreamWriter (MVar StreamWriterData)
+
+data StreamWriterData = StreamWriterData
+ { swdStream :: RawStreamWriter
+ , swdSequence :: Maybe Word64
+ }
+
+getStreamWriterNumber :: StreamWriter -> IO Int
+getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
+
+
+openStream :: Service s => ServiceHandler s StreamWriter
+openStream = do
+ mvar <- liftIO newEmptyMVar
+ tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
+ return $ StreamWriter mvar
+
+receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
+receivedStreams = do
+ map StreamReader <$> asks svcNewStreams
+
+readStreamPacket :: StreamReader -> IO StreamPacket
+readStreamPacket (StreamReader stream) = do
+ readFlowIO (rsrFlow stream)
+
+writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
+writeStreamPacket (StreamWriter mvar) packet = do
+ withMVar mvar $ \swd -> do
+ writeFlowIO (rswFlow $ swdStream swd) packet
+
+writeStream :: StreamWriter -> ByteString -> IO ()
+writeStream (StreamWriter mvar) bytes = do
+ modifyMVar_ mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> do
+ writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
+ return swd { swdSequence = Just (seqNum + 1) }
+ Nothing -> do
+ fail "writeStream: stream closed"
+
+closeStream :: StreamWriter -> IO ()
+closeStream (StreamWriter mvar) = do
+ withMVar mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
+ Nothing -> fail "closeStream: stream already closed"
diff --git a/test/network.test b/test/network.test
index 52fcbee..0f49a1e 100644
--- a/test/network.test
+++ b/test/network.test
@@ -182,6 +182,58 @@ test ManyStreams:
expect /test-message-received blob 100[2-4] $ref/ from p2
+test ServiceStreams:
+ let services = "test"
+
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server services $services" to p1
+ send "start-server services $services" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ send "test-stream-open 1" to p1
+ expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1
+ expect /test-stream-open-from 1 $stream1/ from p2
+
+ send "test-stream-send 1 $stream1 hello" to p1
+ expect /test-stream-send-done 1 $stream1/ from p1
+ expect /test-stream-received 1 $stream1 0 hello/ from p2
+
+ send "test-stream-close 1 $stream1" to p1
+ expect /test-stream-close-done 1 $stream1/ from p1
+ expect /test-stream-closed-from 1 $stream1 1/ from p2
+
+ send "test-stream-open 1 8" to p2
+ expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8
+ expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1
+
+ let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ]
+ with p2:
+ for i in [ 1..20 ]:
+ for s in streams2:
+ send "test-stream-send 1 $s hello$i"
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-send-done 1 $s/
+ for s in streams2:
+ send "test-stream-close 1 $s"
+ for s in streams2:
+ expect /test-stream-close-done 1 $s/
+ with p1:
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-received 1 $s ${i-1} hello$i/
+ for s in streams2:
+ expect /test-stream-closed-from 1 $s 20/
+
+
test MultipleServiceRefs:
let services = "test"