From 79ba31139f945895b3a080babd3d8de6384fd162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 9 Sep 2023 21:50:48 +0200 Subject: Protocol: process incoming only if upstream is free --- src/Network/Protocol.hs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index a1167e5..dc33296 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -126,6 +126,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gConnections :: TVar [Connection addr] , gDataFlow :: SymFlow (addr, ByteString) , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) + , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () , gStorage :: PartialStorage , gNowVar :: TVar TimeSpec @@ -203,6 +204,7 @@ erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do gIdentity <- newTVarIO (initialIdentity, []) gConnections <- newTVarIO [] + gNextUp <- newEmptyTMVarIO mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage @@ -229,8 +231,7 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do race_ (waitTill next) waitForUpdate race_ signalTimeouts $ forever $ join $ atomically $ - processIncomming gs <|> processOutgoing gs - + passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs getConnection :: GlobalState addr -> addr -> STM (Connection addr) @@ -255,8 +256,16 @@ newConnection GlobalState {..} addr = do writeTVar gConnections (conn : conns) return conn -processIncomming :: GlobalState addr -> STM (IO ()) -processIncomming gs@GlobalState {..} = do +passUpIncoming :: GlobalState addr -> STM (IO ()) +passUpIncoming GlobalState {..} = do + (Connection {..}, up) <- takeTMVar gNextUp + writeFlow cDataInternal up + return $ return () + +processIncoming :: GlobalState addr -> STM (IO ()) +processIncoming gs@GlobalState {..} = do + guard =<< isEmptyTMVar gNextUp + (addr, msg) <- readFlow gDataFlow mbconn <- findConnection gs addr @@ -299,10 +308,10 @@ processIncomming gs@GlobalState {..} = do | hobj:content <- objs , Just header@(TransportHeader items) <- transportFromObject hobj -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case - Just (conn@Connection {..}, mbup) -> atomically $ do + Just (conn, mbup) -> atomically $ do processAcknowledgements gs conn items case mbup of - Just up -> writeFlow cDataInternal (secure, up) + Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () Nothing -> return () -- cgit v1.2.3