summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-04-20 21:29:54 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-04-22 21:54:00 +0200
commitf195f4a165b573d92d975b3e489372e88708e687 (patch)
tree637e315d3042726793032a39478cf674478c211e
parent1d3da93dad6410b2416c1cf3ba1fe19887ccf5f4 (diff)
Network: packet reservation before sending
-rw-r--r--src/Erebos/Network/Protocol.hs65
1 files changed, 51 insertions, 14 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index e5eb652..f9bb53c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -168,6 +168,8 @@ data Connection addr = Connection
, cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
, cChannel :: TVar ChannelState
, cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem])
+ , cMaxInFlightPackets :: TVar Int
+ , cReservedPackets :: TVar Int
, cSentPackets :: TVar [SentPacket]
, cToAcknowledge :: TVar [Integer]
, cInStreams :: TVar [(Word8, Stream)]
@@ -203,7 +205,9 @@ connAddWriteStream conn@Connection {..} = do
writeTVar cOutStreams outStreams'
let go = do
- msg <- atomically $ readFlow (sFlowOut stream)
+ (reserved, msg) <- atomically $ (,)
+ <$> reservePacket conn
+ <*> readFlow (sFlowOut stream)
let (plain, cont) = case msg of
StreamData {..} -> (stpData, True)
StreamClosed {} -> (BC.empty, False)
@@ -211,6 +215,7 @@ connAddWriteStream conn@Connection {..} = do
-- TODO: free channel number after delivering stream closed
let secure = True
plainAckedBy = []
+ mbReserved = Just reserved
mbch <- atomically (readTVar cChannel) >>= return . \case
ChannelEstablished ch -> Just ch
@@ -233,7 +238,9 @@ connAddWriteStream conn@Connection {..} = do
| otherwise -> return $ Just (plain, plainAckedBy)
case mbs of
- Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ Just (bs, ackedBy) -> do
+ let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved
+ sendBytes conn mbReserved' bs
Nothing -> return ()
when cont go
@@ -327,6 +334,9 @@ data ChannelState = ChannelNone
| ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel
| ChannelEstablished Channel
+data ReservedToSend = ReservedToSend
+ { rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
+ }
data SentPacket = SentPacket
{ spTime :: TimeSpec
@@ -399,6 +409,8 @@ newConnection cGlobalState@GlobalState {..} addr = do
(cDataUp, cDataInternal) <- newFlow
cChannel <- newTVar ChannelNone
cSecureOutQueue <- newTQueue
+ cMaxInFlightPackets <- newTVar 4
+ cReservedPackets <- newTVar 0
cSentPackets <- newTVar []
cToAcknowledge <- newTVar []
cInStreams <- newTVar []
@@ -618,11 +630,27 @@ createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)
verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool
verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie
-resendBytes :: Connection addr -> SentPacket -> IO ()
-resendBytes Connection {..} sp = do
+
+reservePacket :: Connection addr -> STM ReservedToSend
+reservePacket Connection {..} = do
+ maxPackets <- readTVar cMaxInFlightPackets
+ reserved <- readTVar cReservedPackets
+ sent <- length <$> readTVar cSentPackets
+
+ when (sent + reserved >= maxPackets) $ do
+ retry
+
+ writeTVar cReservedPackets $ reserved + 1
+ return $ ReservedToSend Nothing
+
+resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
+resendBytes Connection {..} reserved sp = do
let GlobalState {..} = cGlobalState
now <- getTime MonotonicRaw
atomically $ do
+ when (isJust reserved) $ do
+ modifyTVar' cReservedPackets (subtract 1)
+
when (isJust $ spAckedBy sp) $ do
modifyTVar' cSentPackets $ (:) sp
{ spTime = now
@@ -630,12 +658,12 @@ resendBytes Connection {..} sp = do
}
writeFlow gDataFlow (cAddress, spData sp)
-sendBytes :: Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO ()
-sendBytes conn bs ackedBy = resendBytes conn
+sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()
+sendBytes conn reserved bs = resendBytes conn reserved
SentPacket
{ spTime = undefined
, spRetryCount = -1
- , spAckedBy = ackedBy
+ , spAckedBy = rsAckedBy =<< reserved
, spData = bs
}
@@ -650,18 +678,22 @@ processOutgoing gs@GlobalState {..} = do
_ -> Nothing
let checkOutstanding
- | isJust mbch = readTQueue cSecureOutQueue
+ | isJust mbch = do
+ (,) <$> readTQueue cSecureOutQueue <*> (Just <$> reservePacket conn)
| otherwise = retry
+ checkDataInternal = do
+ (,) <$> readFlow cDataInternal <*> (Just <$> reservePacket conn)
+
checkAcknowledgements
| isJust mbch = do
acks <- readTVar cToAcknowledge
if null acks then retry
- else return (True, TransportPacket (TransportHeader []) [], [])
+ else return ((True, TransportPacket (TransportHeader []) [], []), Nothing)
| otherwise = retry
- (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <-
- checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements
+ ((secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy), mbReserved) <-
+ checkOutstanding <|> checkDataInternal <|> checkAcknowledgements
when (isNothing mbch && secure) $ do
writeTQueue cSecureOutQueue (secure, packet, plainAckedBy)
@@ -690,7 +722,9 @@ processOutgoing gs@GlobalState {..} = do
| otherwise -> return $ Just (BL.toStrict plain, plainAckedBy)
case mbs of
- Just (bs, ackedBy) -> sendBytes conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ Just (bs, ackedBy) -> do
+ let mbReserved' = (\rs -> rs { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) }) <$> mbReserved
+ sendBytes conn mbReserved' bs
Nothing -> return ()
let retransmitPacket :: Connection addr -> STM (IO ())
@@ -708,8 +742,9 @@ processOutgoing gs@GlobalState {..} = do
return $ return ()
else retry
else do
+ reserved <- reservePacket conn
writeTVar cSentPackets rest
- return $ resendBytes conn sp
+ return $ resendBytes conn (Just reserved) sp
let handleControlRequests = readFlow gControlFlow >>= \case
RequestConnection addr -> do
@@ -717,6 +752,7 @@ processOutgoing gs@GlobalState {..} = do
identity <- fst <$> readTVar gIdentity
readTVar cChannel >>= \case
ChannelNone -> do
+ reserved <- reservePacket conn
let packet = BL.toStrict $ BL.concat
[ serializeObject $ transportToObject gStorage $ TransportHeader $
[ Initiation $ refDigest gInitConfig
@@ -725,7 +761,8 @@ processOutgoing gs@GlobalState {..} = do
, lazyLoadBytes gInitConfig
]
writeTVar cChannel ChannelCookieWait
- return $ sendBytes conn packet $ Just $ \case CookieSet {} -> True; _ -> False
+ let reserved' = reserved { rsAckedBy = Just $ \case CookieSet {} -> True; _ -> False }
+ return $ sendBytes conn (Just reserved') packet
_ -> return $ return ()
SendAnnounce addr -> do