diff options
Diffstat (limited to 'src/Erebos/Network')
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 52 | 
1 files changed, 33 insertions, 19 deletions
| diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 26bd615..a009ad1 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState  data Connection addr = Connection      { cGlobalState :: GlobalState addr      , cAddress :: addr -    , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) -    , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) +    , cDataUp :: Flow +        (Maybe (Bool, TransportPacket PartialObject)) +        (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +    , cDataInternal :: Flow +        (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +        (Maybe (Bool, TransportPacket PartialObject))      , cChannel :: TVar ChannelState      , cCookie :: TVar (Maybe Cookie)      , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -192,7 +196,9 @@ instance Eq (Connection addr) where  connAddress :: Connection addr -> addr  connAddress = cAddress -connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +connData :: Connection addr -> Flow +    (Maybe (Bool, TransportPacket PartialObject)) +    (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])  connData = cDataUp  connGetChannel :: Connection addr -> STM ChannelState @@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do  connClose :: Connection addr -> STM ()  connClose conn@Connection {..} = do      let GlobalState {..} = cGlobalState -    writeTVar cChannel ChannelClosed -    writeTVar gConnections . filter (/=conn) =<< readTVar gConnections +    readTVar cChannel >>= \case +        ChannelClosed -> return () +        _ -> do +            writeTVar cChannel ChannelClosed +            writeTVar gConnections . filter (/=conn) =<< readTVar gConnections +            writeFlow cDataInternal Nothing  connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))  connAddWriteStream conn@Connection {..} = do @@ -396,6 +406,7 @@ data ChannelState = ChannelNone  data ReservedToSend = ReservedToSend      { rsAckedBy :: Maybe (TransportHeaderItem -> Bool)      , rsOnAck :: IO () +    , rsOnFail :: IO ()      }  data SentPacket = SentPacket @@ -403,6 +414,7 @@ data SentPacket = SentPacket      , spRetryCount :: Int      , spAckedBy :: Maybe (TransportHeaderItem -> Bool)      , spOnAck :: IO () +    , spOnFail :: IO ()      , spData :: BC.ByteString      } @@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do  passUpIncoming :: GlobalState addr -> STM (IO ())  passUpIncoming GlobalState {..} = do      (Connection {..}, up) <- takeTMVar gNextUp -    writeFlow cDataInternal up +    writeFlow cDataInternal (Just up)      return $ return ()  processIncoming :: GlobalState addr -> STM (IO ()) @@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa  reservePacket :: Connection addr -> STM ReservedToSend -reservePacket Connection {..} = do +reservePacket conn@Connection {..} = do      maxPackets <- readTVar cMaxInFlightPackets      reserved <- readTVar cReservedPackets      sent <- length <$> readTVar cSentPackets @@ -698,7 +710,7 @@ reservePacket Connection {..} = do          retry      writeTVar cReservedPackets $ reserved + 1 -    return $ ReservedToSend Nothing (return ()) +    return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)  resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()  resendBytes Connection {..} reserved sp = do @@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved          , spRetryCount = -1          , spAckedBy = rsAckedBy =<< reserved          , spOnAck = maybe (return ()) rsOnAck reserved +        , spOnFail = maybe (return ()) rsOnFail reserved          , spData = bs          } @@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do                  sps@(_:_) -> return (last sps, init sps)                  _         -> retry              let nextTry = spTime sp + fromNanoSecs 1000000000 -            if now < nextTry -              then do -                nextTimeout <- readTVar gNextTimeout -                if nextTimeout <= now || nextTry < nextTimeout -                   then do writeTVar gNextTimeout nextTry -                           return $ return () -                   else retry -              else do -                reserved <- reservePacket conn -                writeTVar cSentPackets rest -                return $ resendBytes conn (Just reserved) sp +            if | now < nextTry -> do +                    nextTimeout <- readTVar gNextTimeout +                    if nextTimeout <= now || nextTry < nextTimeout +                       then do writeTVar gNextTimeout nextTry +                               return $ return () +                       else retry +               | spRetryCount sp < 2 -> do +                    reserved <- reservePacket conn +                    writeTVar cSentPackets rest +                    return $ resendBytes conn (Just reserved) sp +               | otherwise -> do +                    return $ spOnFail sp      let handleControlRequests = readFlow gControlFlow >>= \case              RequestConnection addr -> do |