From f195f4a165b573d92d975b3e489372e88708e687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 20 Apr 2024 21:29:54 +0200 Subject: Network: packet reservation before sending --- src/Erebos/Network/Protocol.hs | 65 +++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index e5eb652..f9bb53c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -168,6 +168,8 @@ data Connection addr = Connection , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) , cChannel :: TVar ChannelState , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) + , cMaxInFlightPackets :: TVar Int + , cReservedPackets :: TVar Int , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] , cInStreams :: TVar [(Word8, Stream)] @@ -203,7 +205,9 @@ connAddWriteStream conn@Connection {..} = do writeTVar cOutStreams outStreams' let go = do - msg <- atomically $ readFlow (sFlowOut stream) + (reserved, msg) <- atomically $ (,) + <$> reservePacket conn + <*> readFlow (sFlowOut stream) let (plain, cont) = case msg of StreamData {..} -> (stpData, True) StreamClosed {} -> (BC.empty, False) @@ -211,6 +215,7 @@ connAddWriteStream conn@Connection {..} = do -- TODO: free channel number after delivering stream closed let secure = True plainAckedBy = [] + mbReserved = Just reserved mbch <- atomically (readTVar cChannel) >>= return . \case ChannelEstablished ch -> Just ch @@ -233,7 +238,9 @@ connAddWriteStream conn@Connection {..} = do | otherwise -> return $ Just (plain, plainAckedBy) case mbs of - Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) + Just (bs, ackedBy) -> do + let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved + sendBytes conn mbReserved' bs Nothing -> return () when cont go @@ -327,6 +334,9 @@ data ChannelState = ChannelNone | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel | ChannelEstablished Channel +data ReservedToSend = ReservedToSend + { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) + } data SentPacket = SentPacket { spTime :: TimeSpec @@ -399,6 +409,8 @@ newConnection cGlobalState@GlobalState {..} addr = do (cDataUp, cDataInternal) <- newFlow cChannel <- newTVar ChannelNone cSecureOutQueue <- newTQueue + cMaxInFlightPackets <- newTVar 4 + cReservedPackets <- newTVar 0 cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] cInStreams <- newTVar [] @@ -618,11 +630,27 @@ createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie -resendBytes :: Connection addr -> SentPacket -> IO () -resendBytes Connection {..} sp = do + +reservePacket :: Connection addr -> STM ReservedToSend +reservePacket Connection {..} = do + maxPackets <- readTVar cMaxInFlightPackets + reserved <- readTVar cReservedPackets + sent <- length <$> readTVar cSentPackets + + when (sent + reserved >= maxPackets) $ do + retry + + writeTVar cReservedPackets $ reserved + 1 + return $ ReservedToSend Nothing + +resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () +resendBytes Connection {..} reserved sp = do let GlobalState {..} = cGlobalState now <- getTime MonotonicRaw atomically $ do + when (isJust reserved) $ do + modifyTVar' cReservedPackets (subtract 1) + when (isJust $ spAckedBy sp) $ do modifyTVar' cSentPackets $ (:) sp { spTime = now @@ -630,12 +658,12 @@ resendBytes Connection {..} sp = do } writeFlow gDataFlow (cAddress, spData sp) -sendBytes :: Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO () -sendBytes conn bs ackedBy = resendBytes conn +sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () +sendBytes conn reserved bs = resendBytes conn reserved SentPacket { spTime = undefined , spRetryCount = -1 - , spAckedBy = ackedBy + , spAckedBy = rsAckedBy =<< reserved , spData = bs } @@ -650,18 +678,22 @@ processOutgoing gs@GlobalState {..} = do _ -> Nothing let checkOutstanding - | isJust mbch = readTQueue cSecureOutQueue + | isJust mbch = do + (,) <$> readTQueue cSecureOutQueue <*> (Just <$> reservePacket conn) | otherwise = retry + checkDataInternal = do + (,) <$> readFlow cDataInternal <*> (Just <$> reservePacket conn) + checkAcknowledgements | isJust mbch = do acks <- readTVar cToAcknowledge if null acks then retry - else return (True, TransportPacket (TransportHeader []) [], []) + else return ((True, TransportPacket (TransportHeader []) [], []), Nothing) | otherwise = retry - (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <- - checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements + ((secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy), mbReserved) <- + checkOutstanding <|> checkDataInternal <|> checkAcknowledgements when (isNothing mbch && secure) $ do writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) @@ -690,7 +722,9 @@ processOutgoing gs@GlobalState {..} = do | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy) case mbs of - Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) + Just (bs, ackedBy) -> do + let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved + sendBytes conn mbReserved' bs Nothing -> return () let retransmitPacket :: Connection addr -> STM (IO ()) @@ -708,8 +742,9 @@ processOutgoing gs@GlobalState {..} = do return $ return () else retry else do + reserved <- reservePacket conn writeTVar cSentPackets rest - return $ resendBytes conn sp + return $ resendBytes conn (Just reserved) sp let handleControlRequests = readFlow gControlFlow >>= \case RequestConnection addr -> do @@ -717,6 +752,7 @@ processOutgoing gs@GlobalState {..} = do identity <- fst <$> readTVar gIdentity readTVar cChannel >>= \case ChannelNone -> do + reserved <- reservePacket conn let packet = BL.toStrict $ BL.concat [ serializeObject $ transportToObject gStorage $ TransportHeader $ [ Initiation $ refDigest gInitConfig @@ -725,7 +761,8 @@ processOutgoing gs@GlobalState {..} = do , lazyLoadBytes gInitConfig ] writeTVar cChannel ChannelCookieWait - return $ sendBytes conn packet $ Just $ \case CookieSet {} -> True; _ -> False + let reserved' = reserved { rsAckedBy = Just $ \case CookieSet {} -> True; _ -> False } + return $ sendBytes conn (Just reserved') packet _ -> return $ return () SendAnnounce addr -> do -- cgit v1.2.3