diff options
Diffstat (limited to 'src/Erebos')
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 65 | 
1 files changed, 51 insertions, 14 deletions
| diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index e5eb652..f9bb53c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -168,6 +168,8 @@ data Connection addr = Connection      , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)      , cChannel :: TVar ChannelState      , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) +    , cMaxInFlightPackets :: TVar Int +    , cReservedPackets :: TVar Int      , cSentPackets :: TVar [SentPacket]      , cToAcknowledge :: TVar [Integer]      , cInStreams :: TVar [(Word8, Stream)] @@ -203,7 +205,9 @@ connAddWriteStream conn@Connection {..} = do      writeTVar cOutStreams outStreams'      let go = do -            msg <- atomically $ readFlow (sFlowOut stream) +            (reserved, msg) <- atomically $ (,) +                <$> reservePacket conn +                <*> readFlow (sFlowOut stream)              let (plain, cont) = case msg of                      StreamData {..} -> (stpData, True)                      StreamClosed {} -> (BC.empty, False) @@ -211,6 +215,7 @@ connAddWriteStream conn@Connection {..} = do                      -- TODO: free channel number after delivering stream closed              let secure = True                  plainAckedBy = [] +                mbReserved = Just reserved              mbch <- atomically (readTVar cChannel) >>= return . \case                  ChannelEstablished ch   -> Just ch @@ -233,7 +238,9 @@ connAddWriteStream conn@Connection {..} = do                          | otherwise -> return $ Just (plain, plainAckedBy)              case mbs of -                Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) +                Just (bs, ackedBy) -> do +                    let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved +                    sendBytes conn mbReserved' bs                  Nothing -> return ()              when cont go @@ -327,6 +334,9 @@ data ChannelState = ChannelNone                    | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel                    | ChannelEstablished Channel +data ReservedToSend = ReservedToSend +    { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) +    }  data SentPacket = SentPacket      { spTime :: TimeSpec @@ -399,6 +409,8 @@ newConnection cGlobalState@GlobalState {..} addr = do      (cDataUp, cDataInternal) <- newFlow      cChannel <- newTVar ChannelNone      cSecureOutQueue <- newTQueue +    cMaxInFlightPackets <- newTVar 4 +    cReservedPackets <- newTVar 0      cSentPackets <- newTVar []      cToAcknowledge <- newTVar []      cInStreams <- newTVar [] @@ -618,11 +630,27 @@ createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)  verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool  verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie -resendBytes :: Connection addr -> SentPacket -> IO () -resendBytes Connection {..} sp = do + +reservePacket :: Connection addr -> STM ReservedToSend +reservePacket Connection {..} = do +    maxPackets <- readTVar cMaxInFlightPackets +    reserved <- readTVar cReservedPackets +    sent <- length <$> readTVar cSentPackets + +    when (sent + reserved >= maxPackets) $ do +        retry + +    writeTVar cReservedPackets $ reserved + 1 +    return $ ReservedToSend Nothing + +resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () +resendBytes Connection {..} reserved sp = do      let GlobalState {..} = cGlobalState      now <- getTime MonotonicRaw      atomically $ do +        when (isJust reserved) $ do +            modifyTVar' cReservedPackets (subtract 1) +          when (isJust $ spAckedBy sp) $ do              modifyTVar' cSentPackets $ (:) sp                  { spTime = now @@ -630,12 +658,12 @@ resendBytes Connection {..} sp = do                  }          writeFlow gDataFlow (cAddress, spData sp) -sendBytes :: Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO () -sendBytes conn bs ackedBy = resendBytes conn +sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () +sendBytes conn reserved bs = resendBytes conn reserved      SentPacket          { spTime = undefined          , spRetryCount = -1 -        , spAckedBy = ackedBy +        , spAckedBy = rsAckedBy =<< reserved          , spData = bs          } @@ -650,18 +678,22 @@ processOutgoing gs@GlobalState {..} = do                      _                     -> Nothing              let checkOutstanding -                    | isJust mbch = readTQueue cSecureOutQueue +                    | isJust mbch = do +                        (,) <$> readTQueue cSecureOutQueue <*> (Just <$> reservePacket conn)                      | otherwise = retry +                checkDataInternal = do +                    (,) <$> readFlow cDataInternal <*> (Just <$> reservePacket conn) +                  checkAcknowledgements                      | isJust mbch = do                          acks <- readTVar cToAcknowledge                          if null acks then retry -                                     else return (True, TransportPacket (TransportHeader []) [], []) +                                     else return ((True, TransportPacket (TransportHeader []) [], []), Nothing)                      | otherwise = retry -            (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <- -                checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements +            ((secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy), mbReserved) <- +                checkOutstanding <|> checkDataInternal <|> checkAcknowledgements              when (isNothing mbch && secure) $ do                  writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) @@ -690,7 +722,9 @@ processOutgoing gs@GlobalState {..} = do                              | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy)                  case mbs of -                    Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) +                    Just (bs, ackedBy) -> do +                        let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved +                        sendBytes conn mbReserved' bs                      Nothing -> return ()      let retransmitPacket :: Connection addr -> STM (IO ()) @@ -708,8 +742,9 @@ processOutgoing gs@GlobalState {..} = do                             return $ return ()                     else retry                else do +                reserved <- reservePacket conn                  writeTVar cSentPackets rest -                return $ resendBytes conn sp +                return $ resendBytes conn (Just reserved) sp      let handleControlRequests = readFlow gControlFlow >>= \case              RequestConnection addr -> do @@ -717,6 +752,7 @@ processOutgoing gs@GlobalState {..} = do                  identity <- fst <$> readTVar gIdentity                  readTVar cChannel >>= \case                      ChannelNone -> do +                        reserved <- reservePacket conn                          let packet = BL.toStrict $ BL.concat                                  [ serializeObject $ transportToObject gStorage $ TransportHeader $                                      [ Initiation $ refDigest gInitConfig @@ -725,7 +761,8 @@ processOutgoing gs@GlobalState {..} = do                                  , lazyLoadBytes gInitConfig                                  ]                          writeTVar cChannel ChannelCookieWait -                        return $ sendBytes conn packet $ Just $ \case CookieSet {} -> True; _ -> False +                        let reserved' = reserved { rsAckedBy = Just $ \case CookieSet {} -> True; _ -> False } +                        return $ sendBytes conn (Just reserved') packet                      _ -> return $ return ()              SendAnnounce addr -> do |