From fb074d4decf6a1406ad39737741a061e1b5bc2d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 1 Jun 2024 20:36:54 +0200 Subject: Drop peer on packet delivery failure --- main/Main.hs | 14 ++++++++---- src/Erebos/Network.hs | 17 +++++++++----- src/Erebos/Network/Protocol.hs | 52 +++++++++++++++++++++++++++--------------- 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/main/Main.hs b/main/Main.hs index 44e2f7b..df904a2 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -236,17 +236,21 @@ interactiveLoop st opts = runInputT inputSettings $ do peer <- getNextPeerChange server peerIdentity peer >>= \case pid@(PeerIdentityFull _) -> do + dropped <- isPeerDropped peer let shown = showPeer pid $ peerAddress peer - let update [] = ([(peer, shown)], Nothing) - update ((p,s):ps) | p == peer = ((peer, shown) : ps, Just s) - | otherwise = first ((p,s):) $ update ps + let update [] = ([(peer, shown)], (Nothing, "NEW")) + update ((p,s):ps) + | p == peer && dropped = (ps, (Nothing, "DEL")) + | p == peer = ((peer, shown) : ps, (Just s, "UPD")) + | otherwise = first ((p,s):) $ update ps let ctxUpdate n [] = ([SelectedPeer peer], n) ctxUpdate n (ctx:ctxs) | SelectedPeer p <- ctx, p == peer = (ctx:ctxs, n) | otherwise = first (ctx:) $ ctxUpdate (n + 1) ctxs - op <- modifyMVar peers (return . update) + (op, updateType) <- modifyMVar peers (return . update) + let updateType' = if dropped then "DEL" else updateType idx <- modifyMVar contextOptions (return . ctxUpdate (1 :: Int)) - when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> shown + when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown _ -> return () let getInputLines prompt = do diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index f234971..41b6279 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -342,12 +342,17 @@ startServer opt serverOrigHead logd' serverServices = do _ -> return () Nothing -> return () - forever $ do - (secure, TransportPacket header objs) <- readFlowIO $ connData conn - prefs <- forM objs $ storeObject $ peerInStorage peer - identity <- readMVar serverIdentity_ - let svcs = map someServiceID serverServices - handlePacket identity secure peer chanSvc svcs header prefs + let peerLoop = readFlowIO (connData conn) >>= \case + Just (secure, TransportPacket header objs) -> do + prefs <- forM objs $ storeObject $ peerInStorage peer + identity <- readMVar serverIdentity_ + let svcs = map someServiceID serverServices + handlePacket identity secure peer chanSvc svcs header prefs + peerLoop + Nothing -> do + dropPeer peer + atomically $ writeTChan serverChanPeer peer + peerLoop ReceivedAnnounce addr _ -> do void $ serverPeer' server addr diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 26bd615..a009ad1 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState data Connection addr = Connection { cGlobalState :: GlobalState addr , cAddress :: addr - , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) - , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) + , cDataUp :: Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + , cDataInternal :: Flow + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + (Maybe (Bool, TransportPacket PartialObject)) , cChannel :: TVar ChannelState , cCookie :: TVar (Maybe Cookie) , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -192,7 +196,9 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress -connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +connData :: Connection addr -> Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) connData = cDataUp connGetChannel :: Connection addr -> STM ChannelState @@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do connClose :: Connection addr -> STM () connClose conn@Connection {..} = do let GlobalState {..} = cGlobalState - writeTVar cChannel ChannelClosed - writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + readTVar cChannel >>= \case + ChannelClosed -> return () + _ -> do + writeTVar cChannel ChannelClosed + writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + writeFlow cDataInternal Nothing connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do @@ -396,6 +406,7 @@ data ChannelState = ChannelNone data ReservedToSend = ReservedToSend { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) , rsOnAck :: IO () + , rsOnFail :: IO () } data SentPacket = SentPacket @@ -403,6 +414,7 @@ data SentPacket = SentPacket , spRetryCount :: Int , spAckedBy :: Maybe (TransportHeaderItem -> Bool) , spOnAck :: IO () + , spOnFail :: IO () , spData :: BC.ByteString } @@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do passUpIncoming :: GlobalState addr -> STM (IO ()) passUpIncoming GlobalState {..} = do (Connection {..}, up) <- takeTMVar gNextUp - writeFlow cDataInternal up + writeFlow cDataInternal (Just up) return $ return () processIncoming :: GlobalState addr -> STM (IO ()) @@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa reservePacket :: Connection addr -> STM ReservedToSend -reservePacket Connection {..} = do +reservePacket conn@Connection {..} = do maxPackets <- readTVar cMaxInFlightPackets reserved <- readTVar cReservedPackets sent <- length <$> readTVar cSentPackets @@ -698,7 +710,7 @@ reservePacket Connection {..} = do retry writeTVar cReservedPackets $ reserved + 1 - return $ ReservedToSend Nothing (return ()) + return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () resendBytes Connection {..} reserved sp = do @@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved , spRetryCount = -1 , spAckedBy = rsAckedBy =<< reserved , spOnAck = maybe (return ()) rsOnAck reserved + , spOnFail = maybe (return ()) rsOnFail reserved , spData = bs } @@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do sps@(_:_) -> return (last sps, init sps) _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 - if now < nextTry - then do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry - else do - reserved <- reservePacket conn - writeTVar cSentPackets rest - return $ resendBytes conn (Just reserved) sp + if | now < nextTry -> do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || nextTry < nextTimeout + then do writeTVar gNextTimeout nextTry + return $ return () + else retry + | spRetryCount sp < 2 -> do + reserved <- reservePacket conn + writeTVar cSentPackets rest + return $ resendBytes conn (Just reserved) sp + | otherwise -> do + return $ spOnFail sp let handleControlRequests = readFlow gControlFlow >>= \case RequestConnection addr -> do -- cgit v1.2.3