summaryrefslogtreecommitdiff
path: root/src/Erebos/Network/Protocol.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r--src/Erebos/Network/Protocol.hs52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 26bd615..a009ad1 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
data Connection addr = Connection
{ cGlobalState :: GlobalState addr
, cAddress :: addr
- , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
- , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
+ , cDataUp :: Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ , cDataInternal :: Flow
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ (Maybe (Bool, TransportPacket PartialObject))
, cChannel :: TVar ChannelState
, cCookie :: TVar (Maybe Cookie)
, cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
@@ -192,7 +196,9 @@ instance Eq (Connection addr) where
connAddress :: Connection addr -> addr
connAddress = cAddress
-connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+connData :: Connection addr -> Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
connData = cDataUp
connGetChannel :: Connection addr -> STM ChannelState
@@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do
connClose :: Connection addr -> STM ()
connClose conn@Connection {..} = do
let GlobalState {..} = cGlobalState
- writeTVar cChannel ChannelClosed
- writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+ readTVar cChannel >>= \case
+ ChannelClosed -> return ()
+ _ -> do
+ writeTVar cChannel ChannelClosed
+ writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+ writeFlow cDataInternal Nothing
connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
@@ -396,6 +406,7 @@ data ChannelState = ChannelNone
data ReservedToSend = ReservedToSend
{ rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
, rsOnAck :: IO ()
+ , rsOnFail :: IO ()
}
data SentPacket = SentPacket
@@ -403,6 +414,7 @@ data SentPacket = SentPacket
, spRetryCount :: Int
, spAckedBy :: Maybe (TransportHeaderItem -> Bool)
, spOnAck :: IO ()
+ , spOnFail :: IO ()
, spData :: BC.ByteString
}
@@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
passUpIncoming :: GlobalState addr -> STM (IO ())
passUpIncoming GlobalState {..} = do
(Connection {..}, up) <- takeTMVar gNextUp
- writeFlow cDataInternal up
+ writeFlow cDataInternal (Just up)
return $ return ()
processIncoming :: GlobalState addr -> STM (IO ())
@@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa
reservePacket :: Connection addr -> STM ReservedToSend
-reservePacket Connection {..} = do
+reservePacket conn@Connection {..} = do
maxPackets <- readTVar cMaxInFlightPackets
reserved <- readTVar cReservedPackets
sent <- length <$> readTVar cSentPackets
@@ -698,7 +710,7 @@ reservePacket Connection {..} = do
retry
writeTVar cReservedPackets $ reserved + 1
- return $ ReservedToSend Nothing (return ())
+ return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
resendBytes Connection {..} reserved sp = do
@@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved
, spRetryCount = -1
, spAckedBy = rsAckedBy =<< reserved
, spOnAck = maybe (return ()) rsOnAck reserved
+ , spOnFail = maybe (return ()) rsOnFail reserved
, spData = bs
}
@@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do
sps@(_:_) -> return (last sps, init sps)
_ -> retry
let nextTry = spTime sp + fromNanoSecs 1000000000
- if now < nextTry
- then do
- nextTimeout <- readTVar gNextTimeout
- if nextTimeout <= now || nextTry < nextTimeout
- then do writeTVar gNextTimeout nextTry
- return $ return ()
- else retry
- else do
- reserved <- reservePacket conn
- writeTVar cSentPackets rest
- return $ resendBytes conn (Just reserved) sp
+ if | now < nextTry -> do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || nextTry < nextTimeout
+ then do writeTVar gNextTimeout nextTry
+ return $ return ()
+ else retry
+ | spRetryCount sp < 2 -> do
+ reserved <- reservePacket conn
+ writeTVar cSentPackets rest
+ return $ resendBytes conn (Just reserved) sp
+ | otherwise -> do
+ return $ spOnFail sp
let handleControlRequests = readFlow gControlFlow >>= \case
RequestConnection addr -> do