From fb074d4decf6a1406ad39737741a061e1b5bc2d1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Roman=20Smr=C5=BE?= <roman.smrz@seznam.cz>
Date: Sat, 1 Jun 2024 20:36:54 +0200
Subject: Drop peer on packet delivery failure

---
 main/Main.hs                   | 14 ++++++++----
 src/Erebos/Network.hs          | 17 +++++++++-----
 src/Erebos/Network/Protocol.hs | 52 +++++++++++++++++++++++++++---------------
 3 files changed, 53 insertions(+), 30 deletions(-)

diff --git a/main/Main.hs b/main/Main.hs
index 44e2f7b..df904a2 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -236,17 +236,21 @@ interactiveLoop st opts = runInputT inputSettings $ do
         peer <- getNextPeerChange server
         peerIdentity peer >>= \case
             pid@(PeerIdentityFull _) -> do
+                dropped <- isPeerDropped peer
                 let shown = showPeer pid $ peerAddress peer
-                let update [] = ([(peer, shown)], Nothing)
-                    update ((p,s):ps) | p == peer = ((peer, shown) : ps, Just s)
-                                      | otherwise = first ((p,s):) $ update ps
+                let update [] = ([(peer, shown)], (Nothing, "NEW"))
+                    update ((p,s):ps)
+                        | p == peer && dropped = (ps, (Nothing, "DEL"))
+                        | p == peer = ((peer, shown) : ps, (Just s, "UPD"))
+                        | otherwise = first ((p,s):) $ update ps
                 let ctxUpdate n [] = ([SelectedPeer peer], n)
                     ctxUpdate n (ctx:ctxs)
                         | SelectedPeer p <- ctx, p == peer = (ctx:ctxs, n)
                         | otherwise = first (ctx:) $ ctxUpdate (n + 1) ctxs
-                op <- modifyMVar peers (return . update)
+                (op, updateType) <- modifyMVar peers (return . update)
+                let updateType' = if dropped then "DEL" else updateType
                 idx <- modifyMVar contextOptions (return . ctxUpdate (1 :: Int))
-                when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> shown
+                when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown
             _ -> return ()
 
     let getInputLines prompt = do
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index f234971..41b6279 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -342,12 +342,17 @@ startServer opt serverOrigHead logd' serverServices = do
                                             _ -> return ()
                                 Nothing -> return ()
 
-                            forever $ do
-                                (secure, TransportPacket header objs) <- readFlowIO $ connData conn
-                                prefs <- forM objs $ storeObject $ peerInStorage peer
-                                identity <- readMVar serverIdentity_
-                                let svcs = map someServiceID serverServices
-                                handlePacket identity secure peer chanSvc svcs header prefs
+                            let peerLoop = readFlowIO (connData conn) >>= \case
+                                    Just (secure, TransportPacket header objs) -> do
+                                        prefs <- forM objs $ storeObject $ peerInStorage peer
+                                        identity <- readMVar serverIdentity_
+                                        let svcs = map someServiceID serverServices
+                                        handlePacket identity secure peer chanSvc svcs header prefs
+                                        peerLoop
+                                    Nothing -> do
+                                        dropPeer peer
+                                        atomically $ writeTChan serverChanPeer peer
+                            peerLoop
 
                     ReceivedAnnounce addr _ -> do
                         void $ serverPeer' server addr
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 26bd615..a009ad1 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
 data Connection addr = Connection
     { cGlobalState :: GlobalState addr
     , cAddress :: addr
-    , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
-    , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
+    , cDataUp :: Flow
+        (Maybe (Bool, TransportPacket PartialObject))
+        (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+    , cDataInternal :: Flow
+        (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+        (Maybe (Bool, TransportPacket PartialObject))
     , cChannel :: TVar ChannelState
     , cCookie :: TVar (Maybe Cookie)
     , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
@@ -192,7 +196,9 @@ instance Eq (Connection addr) where
 connAddress :: Connection addr -> addr
 connAddress = cAddress
 
-connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+connData :: Connection addr -> Flow
+    (Maybe (Bool, TransportPacket PartialObject))
+    (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
 connData = cDataUp
 
 connGetChannel :: Connection addr -> STM ChannelState
@@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do
 connClose :: Connection addr -> STM ()
 connClose conn@Connection {..} = do
     let GlobalState {..} = cGlobalState
-    writeTVar cChannel ChannelClosed
-    writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+    readTVar cChannel >>= \case
+        ChannelClosed -> return ()
+        _ -> do
+            writeTVar cChannel ChannelClosed
+            writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+            writeFlow cDataInternal Nothing
 
 connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
 connAddWriteStream conn@Connection {..} = do
@@ -396,6 +406,7 @@ data ChannelState = ChannelNone
 data ReservedToSend = ReservedToSend
     { rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
     , rsOnAck :: IO ()
+    , rsOnFail :: IO ()
     }
 
 data SentPacket = SentPacket
@@ -403,6 +414,7 @@ data SentPacket = SentPacket
     , spRetryCount :: Int
     , spAckedBy :: Maybe (TransportHeaderItem -> Bool)
     , spOnAck :: IO ()
+    , spOnFail :: IO ()
     , spData :: BC.ByteString
     }
 
@@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
 passUpIncoming :: GlobalState addr -> STM (IO ())
 passUpIncoming GlobalState {..} = do
     (Connection {..}, up) <- takeTMVar gNextUp
-    writeFlow cDataInternal up
+    writeFlow cDataInternal (Just up)
     return $ return ()
 
 processIncoming :: GlobalState addr -> STM (IO ())
@@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa
 
 
 reservePacket :: Connection addr -> STM ReservedToSend
-reservePacket Connection {..} = do
+reservePacket conn@Connection {..} = do
     maxPackets <- readTVar cMaxInFlightPackets
     reserved <- readTVar cReservedPackets
     sent <- length <$> readTVar cSentPackets
@@ -698,7 +710,7 @@ reservePacket Connection {..} = do
         retry
 
     writeTVar cReservedPackets $ reserved + 1
-    return $ ReservedToSend Nothing (return ())
+    return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
 
 resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
 resendBytes Connection {..} reserved sp = do
@@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved
         , spRetryCount = -1
         , spAckedBy = rsAckedBy =<< reserved
         , spOnAck = maybe (return ()) rsOnAck reserved
+        , spOnFail = maybe (return ()) rsOnFail reserved
         , spData = bs
         }
 
@@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do
                 sps@(_:_) -> return (last sps, init sps)
                 _         -> retry
             let nextTry = spTime sp + fromNanoSecs 1000000000
-            if now < nextTry
-              then do
-                nextTimeout <- readTVar gNextTimeout
-                if nextTimeout <= now || nextTry < nextTimeout
-                   then do writeTVar gNextTimeout nextTry
-                           return $ return ()
-                   else retry
-              else do
-                reserved <- reservePacket conn
-                writeTVar cSentPackets rest
-                return $ resendBytes conn (Just reserved) sp
+            if | now < nextTry -> do
+                    nextTimeout <- readTVar gNextTimeout
+                    if nextTimeout <= now || nextTry < nextTimeout
+                       then do writeTVar gNextTimeout nextTry
+                               return $ return ()
+                       else retry
+               | spRetryCount sp < 2 -> do
+                    reserved <- reservePacket conn
+                    writeTVar cSentPackets rest
+                    return $ resendBytes conn (Just reserved) sp
+               | otherwise -> do
+                    return $ spOnFail sp
 
     let handleControlRequests = readFlow gControlFlow >>= \case
             RequestConnection addr -> do
-- 
cgit v1.2.3