diff options
| -rw-r--r-- | src/Network/Protocol.hs | 21 | 
1 files changed, 15 insertions, 6 deletions
| diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index a1167e5..dc33296 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -126,6 +126,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState      , gConnections :: TVar [Connection addr]      , gDataFlow :: SymFlow (addr, ByteString)      , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) +    , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))      , gLog :: String -> STM ()      , gStorage :: PartialStorage      , gNowVar :: TVar TimeSpec @@ -203,6 +204,7 @@ erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)  erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do      gIdentity <- newTVarIO (initialIdentity, [])      gConnections <- newTVarIO [] +    gNextUp <- newEmptyTMVarIO      mStorage <- memoryStorage      gStorage <- derivePartialStorage mStorage @@ -229,8 +231,7 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do              race_ (waitTill next) waitForUpdate      race_ signalTimeouts $ forever $ join $ atomically $ -        processIncomming gs <|> processOutgoing gs - +        passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs  getConnection :: GlobalState addr -> addr -> STM (Connection addr) @@ -255,8 +256,16 @@ newConnection GlobalState {..} addr = do      writeTVar gConnections (conn : conns)      return conn -processIncomming :: GlobalState addr -> STM (IO ()) -processIncomming gs@GlobalState {..} = do +passUpIncoming :: GlobalState addr -> STM (IO ()) +passUpIncoming GlobalState {..} = do +    (Connection {..}, up) <- takeTMVar gNextUp +    writeFlow cDataInternal up +    return $ return () + +processIncoming :: GlobalState addr -> STM (IO ()) +processIncoming gs@GlobalState {..} = do +    guard =<< isEmptyTMVar gNextUp +      (addr, msg) <- readFlow gDataFlow      mbconn <- findConnection gs addr @@ -299,10 +308,10 @@ processIncomming gs@GlobalState {..} = do                  | hobj:content <- objs                  , Just header@(TransportHeader items) <- transportFromObject hobj                  -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case -                    Just (conn@Connection {..}, mbup) -> atomically $ do +                    Just (conn, mbup) -> atomically $ do                          processAcknowledgements gs conn items                          case mbup of -                            Just up -> writeFlow cDataInternal (secure, up) +                            Just up -> putTMVar gNextUp (conn, (secure, up))                              Nothing -> return ()                      Nothing -> return () |