summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-09-09 21:50:48 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-09-09 22:10:26 +0200
commit79ba31139f945895b3a080babd3d8de6384fd162 (patch)
tree0feb387e3dfcf8e08eb49a897c24edffc2d628e6
parent18e8ec1fca4bd1bdabedcc1ab969ecf4f8d4c26e (diff)
Protocol: process incoming only if upstream is free
-rw-r--r--src/Network/Protocol.hs21
1 files changed, 15 insertions, 6 deletions
diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs
index a1167e5..dc33296 100644
--- a/src/Network/Protocol.hs
+++ b/src/Network/Protocol.hs
@@ -126,6 +126,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
, gConnections :: TVar [Connection addr]
, gDataFlow :: SymFlow (addr, ByteString)
, gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)
+ , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
, gLog :: String -> STM ()
, gStorage :: PartialStorage
, gNowVar :: TVar TimeSpec
@@ -203,6 +204,7 @@ erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
gIdentity <- newTVarIO (initialIdentity, [])
gConnections <- newTVarIO []
+ gNextUp <- newEmptyTMVarIO
mStorage <- memoryStorage
gStorage <- derivePartialStorage mStorage
@@ -229,8 +231,7 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
race_ (waitTill next) waitForUpdate
race_ signalTimeouts $ forever $ join $ atomically $
- processIncomming gs <|> processOutgoing gs
-
+ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
getConnection :: GlobalState addr -> addr -> STM (Connection addr)
@@ -255,8 +256,16 @@ newConnection GlobalState {..} addr = do
writeTVar gConnections (conn : conns)
return conn
-processIncomming :: GlobalState addr -> STM (IO ())
-processIncomming gs@GlobalState {..} = do
+passUpIncoming :: GlobalState addr -> STM (IO ())
+passUpIncoming GlobalState {..} = do
+ (Connection {..}, up) <- takeTMVar gNextUp
+ writeFlow cDataInternal up
+ return $ return ()
+
+processIncoming :: GlobalState addr -> STM (IO ())
+processIncoming gs@GlobalState {..} = do
+ guard =<< isEmptyTMVar gNextUp
+
(addr, msg) <- readFlow gDataFlow
mbconn <- findConnection gs addr
@@ -299,10 +308,10 @@ processIncomming gs@GlobalState {..} = do
| hobj:content <- objs
, Just header@(TransportHeader items) <- transportFromObject hobj
-> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case
- Just (conn@Connection {..}, mbup) -> atomically $ do
+ Just (conn, mbup) -> atomically $ do
processAcknowledgements gs conn items
case mbup of
- Just up -> writeFlow cDataInternal (secure, up)
+ Just up -> putTMVar gNextUp (conn, (secure, up))
Nothing -> return ()
Nothing -> return ()