summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-05-01 20:21:55 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-05-02 21:57:20 +0200
commit25fe0ff7fd7d03e7f4108d361826cd91c038b89c (patch)
tree74e0c9cf2cca18d795e235e7045d50096d95c232
parent61f745b3c57e4fe78bea8f8a7a48923b364dd874 (diff)
Network: reuse stream numbers after closing
-rw-r--r--src/Erebos/Network/Protocol.hs16
-rw-r--r--test/network.test32
2 files changed, 43 insertions, 5 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index b79b105..59fcdca 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -217,11 +217,10 @@ connAddWriteStream conn@Connection {..} = do
_ -> retry
(,) <$> reservePacket conn
<*> readFlow (sFlowOut stream)
- let (plain, cont) = case msg of
- StreamData {..} -> (stpData, True)
- StreamClosed {} -> (BC.empty, False)
+ let (plain, cont, onAck) = case msg of
+ StreamData {..} -> (stpData, True, return ())
+ StreamClosed {} -> (BC.empty, False, streamClosed conn streamNumber)
-- TODO: send channel closed only after delivering all previous data packets
- -- TODO: free channel number after delivering stream closed
let secure = True
plainAckedBy = []
mbReserved = Just reserved
@@ -248,7 +247,10 @@ connAddWriteStream conn@Connection {..} = do
case mbs of
Just (bs, ackedBy) -> do
- let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved
+ let mbReserved' = (\rs -> rs
+ { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ , rsOnAck = rsOnAck rs >> onAck
+ }) <$> mbReserved
sendBytes conn mbReserved' bs
Nothing -> return ()
@@ -301,6 +303,10 @@ streamAccepted Connection {..} snum = atomically $ do
x -> x
Nothing -> return ()
+streamClosed :: Connection addr -> Word8 -> IO ()
+streamClosed Connection {..} snum = atomically $ do
+ modifyTVar' cOutStreams $ filter ((snum /=) . fst)
+
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
readStreamToList stream = readFlowIO stream >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
diff --git a/test/network.test b/test/network.test
index 3df7376..93f9743 100644
--- a/test/network.test
+++ b/test/network.test
@@ -135,8 +135,10 @@ test LargeData:
for i in [0..10]:
with p1:
+ # Create blob with (i * 1000) bytes
send "store blob"
for j in [1 .. i * 10]:
+ # 100 bytes each line
send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
send ""
expect /store-done (blake2#[0-9a-f]*)/ capture ref
@@ -144,3 +146,33 @@ test LargeData:
send "test-message-send 1 $ref"
expect /test-message-send done/
expect /test-message-received blob ${i*1000} $ref/ from p2
+
+
+test ManyStreams:
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server" to p1
+ send "start-server" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ for i in [0..100]:
+ with p1:
+ # Create blob with 1000 bytes + decimal i
+ send "store blob"
+ for j in [1 .. 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send "$i"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref
+
+ send "test-message-send 1 $ref"
+ expect /test-message-send done/
+ expect /test-message-received blob 100[2-4] $ref/ from p2