summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/Main.hs14
-rw-r--r--src/Erebos/Network.hs17
-rw-r--r--src/Erebos/Network/Protocol.hs52
3 files changed, 53 insertions, 30 deletions
diff --git a/main/Main.hs b/main/Main.hs
index 44e2f7b..df904a2 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -236,17 +236,21 @@ interactiveLoop st opts = runInputT inputSettings $ do
peer <- getNextPeerChange server
peerIdentity peer >>= \case
pid@(PeerIdentityFull _) -> do
+ dropped <- isPeerDropped peer
let shown = showPeer pid $ peerAddress peer
- let update [] = ([(peer, shown)], Nothing)
- update ((p,s):ps) | p == peer = ((peer, shown) : ps, Just s)
- | otherwise = first ((p,s):) $ update ps
+ let update [] = ([(peer, shown)], (Nothing, "NEW"))
+ update ((p,s):ps)
+ | p == peer && dropped = (ps, (Nothing, "DEL"))
+ | p == peer = ((peer, shown) : ps, (Just s, "UPD"))
+ | otherwise = first ((p,s):) $ update ps
let ctxUpdate n [] = ([SelectedPeer peer], n)
ctxUpdate n (ctx:ctxs)
| SelectedPeer p <- ctx, p == peer = (ctx:ctxs, n)
| otherwise = first (ctx:) $ ctxUpdate (n + 1) ctxs
- op <- modifyMVar peers (return . update)
+ (op, updateType) <- modifyMVar peers (return . update)
+ let updateType' = if dropped then "DEL" else updateType
idx <- modifyMVar contextOptions (return . ctxUpdate (1 :: Int))
- when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> shown
+ when (Just shown /= op) $ extPrint $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown
_ -> return ()
let getInputLines prompt = do
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index f234971..41b6279 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -342,12 +342,17 @@ startServer opt serverOrigHead logd' serverServices = do
_ -> return ()
Nothing -> return ()
- forever $ do
- (secure, TransportPacket header objs) <- readFlowIO $ connData conn
- prefs <- forM objs $ storeObject $ peerInStorage peer
- identity <- readMVar serverIdentity_
- let svcs = map someServiceID serverServices
- handlePacket identity secure peer chanSvc svcs header prefs
+ let peerLoop = readFlowIO (connData conn) >>= \case
+ Just (secure, TransportPacket header objs) -> do
+ prefs <- forM objs $ storeObject $ peerInStorage peer
+ identity <- readMVar serverIdentity_
+ let svcs = map someServiceID serverServices
+ handlePacket identity secure peer chanSvc svcs header prefs
+ peerLoop
+ Nothing -> do
+ dropPeer peer
+ atomically $ writeTChan serverChanPeer peer
+ peerLoop
ReceivedAnnounce addr _ -> do
void $ serverPeer' server addr
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 26bd615..a009ad1 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -173,8 +173,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
data Connection addr = Connection
{ cGlobalState :: GlobalState addr
, cAddress :: addr
- , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
- , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
+ , cDataUp :: Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ , cDataInternal :: Flow
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ (Maybe (Bool, TransportPacket PartialObject))
, cChannel :: TVar ChannelState
, cCookie :: TVar (Maybe Cookie)
, cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
@@ -192,7 +196,9 @@ instance Eq (Connection addr) where
connAddress :: Connection addr -> addr
connAddress = cAddress
-connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+connData :: Connection addr -> Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
connData = cDataUp
connGetChannel :: Connection addr -> STM ChannelState
@@ -205,8 +211,12 @@ connSetChannel Connection {..} ch = do
connClose :: Connection addr -> STM ()
connClose conn@Connection {..} = do
let GlobalState {..} = cGlobalState
- writeTVar cChannel ChannelClosed
- writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+ readTVar cChannel >>= \case
+ ChannelClosed -> return ()
+ _ -> do
+ writeTVar cChannel ChannelClosed
+ writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+ writeFlow cDataInternal Nothing
connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
@@ -396,6 +406,7 @@ data ChannelState = ChannelNone
data ReservedToSend = ReservedToSend
{ rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
, rsOnAck :: IO ()
+ , rsOnFail :: IO ()
}
data SentPacket = SentPacket
@@ -403,6 +414,7 @@ data SentPacket = SentPacket
, spRetryCount :: Int
, spAckedBy :: Maybe (TransportHeaderItem -> Bool)
, spOnAck :: IO ()
+ , spOnFail :: IO ()
, spData :: BC.ByteString
}
@@ -485,7 +497,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
passUpIncoming :: GlobalState addr -> STM (IO ())
passUpIncoming GlobalState {..} = do
(Connection {..}, up) <- takeTMVar gNextUp
- writeFlow cDataInternal up
+ writeFlow cDataInternal (Just up)
return $ return ()
processIncoming :: GlobalState addr -> STM (IO ())
@@ -689,7 +701,7 @@ verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpa
reservePacket :: Connection addr -> STM ReservedToSend
-reservePacket Connection {..} = do
+reservePacket conn@Connection {..} = do
maxPackets <- readTVar cMaxInFlightPackets
reserved <- readTVar cReservedPackets
sent <- length <$> readTVar cSentPackets
@@ -698,7 +710,7 @@ reservePacket Connection {..} = do
retry
writeTVar cReservedPackets $ reserved + 1
- return $ ReservedToSend Nothing (return ())
+ return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
resendBytes Connection {..} reserved sp = do
@@ -722,6 +734,7 @@ sendBytes conn reserved bs = resendBytes conn reserved
, spRetryCount = -1
, spAckedBy = rsAckedBy =<< reserved
, spOnAck = maybe (return ()) rsOnAck reserved
+ , spOnFail = maybe (return ()) rsOnFail reserved
, spData = bs
}
@@ -805,17 +818,18 @@ processOutgoing gs@GlobalState {..} = do
sps@(_:_) -> return (last sps, init sps)
_ -> retry
let nextTry = spTime sp + fromNanoSecs 1000000000
- if now < nextTry
- then do
- nextTimeout <- readTVar gNextTimeout
- if nextTimeout <= now || nextTry < nextTimeout
- then do writeTVar gNextTimeout nextTry
- return $ return ()
- else retry
- else do
- reserved <- reservePacket conn
- writeTVar cSentPackets rest
- return $ resendBytes conn (Just reserved) sp
+ if | now < nextTry -> do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || nextTry < nextTimeout
+ then do writeTVar gNextTimeout nextTry
+ return $ return ()
+ else retry
+ | spRetryCount sp < 2 -> do
+ reserved <- reservePacket conn
+ writeTVar cSentPackets rest
+ return $ resendBytes conn (Just reserved) sp
+ | otherwise -> do
+ return $ spOnFail sp
let handleControlRequests = readFlow gControlFlow >>= \case
RequestConnection addr -> do