summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-05-04 21:24:17 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-05-05 11:50:00 +0200
commit1d50a136067b59ea8fc6b95b8b22a911f603a605 (patch)
tree91a85b108bb33541cc5e752f6e879708884283ce
parent175ebf1817dac859b3c109d47c9f3d6e963f4b3d (diff)
Network: wait with channel close after delivering all data
-rw-r--r--src/Erebos/Network/Protocol.hs25
1 files changed, 20 insertions, 5 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 59fcdca..3191f16 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -200,6 +200,7 @@ connAddWriteStream conn@Connection {..} = do
sState <- newTVar StreamOpening
(sFlowIn, sFlowOut) <- newFlow
sNextSequence <- newTVar 0
+ sWaitingForAck <- newTVar 0
let info = (n, Stream {..})
return (info, info : streams)
doInsert _ _ = throwError "all outbound streams in use"
@@ -217,10 +218,17 @@ connAddWriteStream conn@Connection {..} = do
_ -> retry
(,) <$> reservePacket conn
<*> readFlow (sFlowOut stream)
- let (plain, cont, onAck) = case msg of
- StreamData {..} -> (stpData, True, return ())
- StreamClosed {} -> (BC.empty, False, streamClosed conn streamNumber)
- -- TODO: send channel closed only after delivering all previous data packets
+
+ (plain, cont, onAck) <- case msg of
+ StreamData {..} -> do
+ return (stpData, True, return ())
+ StreamClosed {} -> do
+ atomically $ do
+ -- wait for ack on all sent stream data
+ waits <- readTVar (sWaitingForAck stream)
+ when (waits > 0) retry
+ return (BC.empty, False, streamClosed conn streamNumber)
+
let secure = True
plainAckedBy = []
mbReserved = Just reserved
@@ -247,9 +255,14 @@ connAddWriteStream conn@Connection {..} = do
case mbs of
Just (bs, ackedBy) -> do
+ atomically $ do
+ modifyTVar' (sWaitingForAck stream) (+ 1)
let mbReserved' = (\rs -> rs
{ rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy)
- , rsOnAck = rsOnAck rs >> onAck
+ , rsOnAck = do
+ rsOnAck rs
+ onAck
+ atomically $ modifyTVar' (sWaitingForAck stream) (subtract 1)
}) <$> mbReserved
sendBytes conn mbReserved' bs
Nothing -> return ()
@@ -266,6 +279,7 @@ connAddReadStream Connection {..} streamNumber = do
sState <- newTVar StreamRunning
(sFlowIn, sFlowOut) <- newFlow
sNextSequence <- newTVar 0
+ sWaitingForAck <- newTVar 0
let stream = Stream {..}
return (stream, (streamNumber, stream) : streams)
(stream, inStreams') <- doInsert inStreams
@@ -281,6 +295,7 @@ data Stream = Stream
, sFlowIn :: Flow Void StreamPacket
, sFlowOut :: Flow StreamPacket Void
, sNextSequence :: TVar Word64
+ , sWaitingForAck :: TVar Word64
}
data StreamState = StreamOpening | StreamRunning