From 25fe0ff7fd7d03e7f4108d361826cd91c038b89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 1 May 2024 20:21:55 +0200 Subject: Network: reuse stream numbers after closing --- src/Erebos/Network/Protocol.hs | 16 +++++++++++----- test/network.test | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index b79b105..59fcdca 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -217,11 +217,10 @@ connAddWriteStream conn@Connection {..} = do _ -> retry (,) <$> reservePacket conn <*> readFlow (sFlowOut stream) - let (plain, cont) = case msg of - StreamData {..} -> (stpData, True) - StreamClosed {} -> (BC.empty, False) + let (plain, cont, onAck) = case msg of + StreamData {..} -> (stpData, True, return ()) + StreamClosed {} -> (BC.empty, False, streamClosed conn streamNumber) -- TODO: send channel closed only after delivering all previous data packets - -- TODO: free channel number after delivering stream closed let secure = True plainAckedBy = [] mbReserved = Just reserved @@ -248,7 +247,10 @@ connAddWriteStream conn@Connection {..} = do case mbs of Just (bs, ackedBy) -> do - let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved + let mbReserved' = (\rs -> rs + { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) + , rsOnAck = rsOnAck rs >> onAck + }) <$> mbReserved sendBytes conn mbReserved' bs Nothing -> return () @@ -301,6 +303,10 @@ streamAccepted Connection {..} snum = atomically $ do x -> x Nothing -> return () +streamClosed :: Connection addr -> Word8 -> IO () +streamClosed Connection {..} snum = atomically $ do + modifyTVar' cOutStreams $ filter ((snum /=) . fst) + readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) readStreamToList stream = readFlowIO stream >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream diff --git a/test/network.test b/test/network.test index 3df7376..93f9743 100644 --- a/test/network.test +++ b/test/network.test @@ -135,8 +135,10 @@ test LargeData: for i in [0..10]: with p1: + # Create blob with (i * 1000) bytes send "store blob" for j in [1 .. i * 10]: + # 100 bytes each line send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" send "" expect /store-done (blake2#[0-9a-f]*)/ capture ref @@ -144,3 +146,33 @@ test LargeData: send "test-message-send 1 $ref" expect /test-message-send done/ expect /test-message-received blob ${i*1000} $ref/ from p2 + + +test ManyStreams: + spawn as p1 + spawn as p2 + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server" to p1 + send "start-server" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + for i in [0..100]: + with p1: + # Create blob with 1000 bytes + decimal i + send "store blob" + for j in [1 .. 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "$i" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref + + send "test-message-send 1 $ref" + expect /test-message-send done/ + expect /test-message-received blob 100[2-4] $ref/ from p2 -- cgit v1.2.3