From a03a457538dd990db34c71facd92ea3173ff6025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Tue, 15 Jul 2025 21:28:30 +0200 Subject: Test output stream opening and closing --- src/Erebos/Network/Protocol.hs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) (limited to 'src/Erebos/Network/Protocol.hs') diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index bd640ac..f67e296 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -250,6 +250,12 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress +showConnAddress :: forall addr. Connection addr -> String +showConnAddress Connection {..} = helper cGlobalState cAddress + where + helper :: GlobalState addr -> addr -> String + helper GlobalState {} = show + connData :: Connection addr -> Flow (Maybe (Bool, TransportPacket PartialObject)) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -274,6 +280,7 @@ connClose conn@Connection {..} = do connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do + let GlobalState {..} = cGlobalState outStreams <- readTVar cOutStreams let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) doInsert n (s@(n', _) : rest) | n == n' = @@ -290,14 +297,16 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' + lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams') return ( StreamOpen streamNumber , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) - , go cGlobalState streamNumber stream + , go streamNumber stream ) where - go gs@GlobalState {..} streamNumber stream = do + go streamNumber stream = do + let GlobalState {..} = cGlobalState (reserved, msg) <- atomically $ do readTVar (sState stream) >>= \case StreamRunning -> return () @@ -309,6 +318,8 @@ connAddWriteStream conn@Connection {..} = do StreamData {..} -> do return (stpData, True, return ()) StreamClosed {} -> do + atomically $ do + gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber atomically $ do -- wait for ack on all sent stream data waits <- readTVar (sWaitingForAck stream) @@ -353,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do sendBytes conn mbReserved' bs Nothing -> return () - when cont $ go gs streamNumber stream + when cont $ go streamNumber stream connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader connAddReadStream Connection {..} streamNumber = do @@ -412,8 +423,10 @@ streamAccepted Connection {..} snum = atomically $ do Nothing -> return () streamClosed :: Connection addr -> Word8 -> IO () -streamClosed Connection {..} snum = atomically $ do - modifyTVar' cOutStreams $ filter ((snum /=) . fst) +streamClosed conn@Connection {..} snum = atomically $ do + streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams + writeTVar cOutStreams streams + gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case @@ -563,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cOutStreams <- newTVar [] let conn = Connection {..} + gTestLog $ "net-conn-new " <> show cAddress writeTVar gConnections (conn : conns) return conn -- cgit v1.2.3