From 25fe0ff7fd7d03e7f4108d361826cd91c038b89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 1 May 2024 20:21:55 +0200 Subject: Network: reuse stream numbers after closing --- src/Erebos/Network/Protocol.hs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src/Erebos/Network') diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index b79b105..59fcdca 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -217,11 +217,10 @@ connAddWriteStream conn@Connection {..} = do _ -> retry (,) <$> reservePacket conn <*> readFlow (sFlowOut stream) - let (plain, cont) = case msg of - StreamData {..} -> (stpData, True) - StreamClosed {} -> (BC.empty, False) + let (plain, cont, onAck) = case msg of + StreamData {..} -> (stpData, True, return ()) + StreamClosed {} -> (BC.empty, False, streamClosed conn streamNumber) -- TODO: send channel closed only after delivering all previous data packets - -- TODO: free channel number after delivering stream closed let secure = True plainAckedBy = [] mbReserved = Just reserved @@ -248,7 +247,10 @@ connAddWriteStream conn@Connection {..} = do case mbs of Just (bs, ackedBy) -> do - let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved + let mbReserved' = (\rs -> rs + { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) + , rsOnAck = rsOnAck rs >> onAck + }) <$> mbReserved sendBytes conn mbReserved' bs Nothing -> return () @@ -301,6 +303,10 @@ streamAccepted Connection {..} snum = atomically $ do x -> x Nothing -> return () +streamClosed :: Connection addr -> Word8 -> IO () +streamClosed Connection {..} snum = atomically $ do + modifyTVar' cOutStreams $ filter ((snum /=) . fst) + readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) readStreamToList stream = readFlowIO stream >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream -- cgit v1.2.3