diff options
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 26bd615..a009ad1 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState data Connection addr = Connection { cGlobalState :: GlobalState addr , cAddress :: addr - , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) - , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) + , cDataUp :: Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + , cDataInternal :: Flow + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + (Maybe (Bool, TransportPacket PartialObject)) , cChannel :: TVar ChannelState , cCookie :: TVar (Maybe Cookie) , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -192,7 +196,9 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress -connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +connData :: Connection addr -> Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) connData = cDataUp connGetChannel :: Connection addr -> STM ChannelState @@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do connClose :: Connection addr -> STM () connClose conn@Connection {..} = do let GlobalState {..} = cGlobalState - writeTVar cChannel ChannelClosed - writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + readTVar cChannel >>= \case + ChannelClosed -> return () + _ -> do + writeTVar cChannel ChannelClosed + writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + writeFlow cDataInternal Nothing connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do @@ -396,6 +406,7 @@ data ChannelState = ChannelNone data ReservedToSend = ReservedToSend { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) , rsOnAck :: IO () + , rsOnFail :: IO () } data SentPacket = SentPacket @@ -403,6 +414,7 @@ data SentPacket = SentPacket , spRetryCount :: Int , spAckedBy :: Maybe (TransportHeaderItem -> Bool) , spOnAck :: IO () + , spOnFail :: IO () , spData :: BC.ByteString } @@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do passUpIncoming :: GlobalState addr -> STM (IO ()) passUpIncoming GlobalState {..} = do (Connection {..}, up) <- takeTMVar gNextUp - writeFlow cDataInternal up + writeFlow cDataInternal (Just up) return $ return () processIncoming :: GlobalState addr -> STM (IO ()) @@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa reservePacket :: Connection addr -> STM ReservedToSend -reservePacket Connection {..} = do +reservePacket conn@Connection {..} = do maxPackets <- readTVar cMaxInFlightPackets reserved <- readTVar cReservedPackets sent <- length <$> readTVar cSentPackets @@ -698,7 +710,7 @@ reservePacket Connection {..} = do retry writeTVar cReservedPackets $ reserved + 1 - return $ ReservedToSend Nothing (return ()) + return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () resendBytes Connection {..} reserved sp = do @@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved , spRetryCount = -1 , spAckedBy = rsAckedBy =<< reserved , spOnAck = maybe (return ()) rsOnAck reserved + , spOnFail = maybe (return ()) rsOnFail reserved , spData = bs } @@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do sps@(_:_) -> return (last sps, init sps) _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 - if now < nextTry - then do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry - else do - reserved <- reservePacket conn - writeTVar cSentPackets rest - return $ resendBytes conn (Just reserved) sp + if | now < nextTry -> do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || nextTry < nextTimeout + then do writeTVar gNextTimeout nextTry + return $ return () + else retry + | spRetryCount sp < 2 -> do + reserved <- reservePacket conn + writeTVar cSentPackets rest + return $ resendBytes conn (Just reserved) sp + | otherwise -> do + return $ spOnFail sp let handleControlRequests = readFlow gControlFlow >>= \case RequestConnection addr -> do |