diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-05-25 10:40:03 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-05-25 22:40:42 +0200 |
commit | db575cad9b8a3c3c2ab9f1a71ac2ea442c761df2 (patch) | |
tree | c5b4b080a7f0767beefe06a216d720a90eb79850 /src/Erebos/Network/Protocol.hs | |
parent | e40317a3b43594c0629c8a0d1d569b4c8d55e2ae (diff) |
Handle peer reconnection after its restart
Allow the handshake and channel request to proceed correcty even with
already established channel to handle the case where peer attempts to
reconnect after restart.
Changelog: Handle peer reconnection after its restart
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 88 |
1 files changed, 44 insertions, 44 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 27e05ba..a669988 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -174,6 +174,7 @@ data Connection addr = Connection , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) , cChannel :: TVar ChannelState + , cCookie :: TVar (Maybe Cookie) , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cMaxInFlightPackets :: TVar Int , cReservedPackets :: TVar Int @@ -240,9 +241,9 @@ connAddWriteStream conn@Connection {..} = do mbReserved = Just reserved mbch <- atomically (readTVar cChannel) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing mbs <- case mbch of Just ch -> do @@ -373,11 +374,11 @@ wrDigest = refDigest . wrefPartial data ChannelState = ChannelNone | ChannelCookieWait -- sent initiation, waiting for response - | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet - | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side - | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest) - | ChannelPeerRequest (Maybe Cookie) WaitingRef - | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel + | ChannelCookieReceived -- received cookie, but no cookie echo yet + | ChannelCookieConfirmed -- received cookie echo, no need to send from our side + | ChannelOurRequest (Stored ChannelRequest) + | ChannelPeerRequest WaitingRef + | ChannelOurAccept (Stored ChannelAccept) Channel | ChannelEstablished Channel data ReservedToSend = ReservedToSend @@ -456,6 +457,7 @@ newConnection cGlobalState@GlobalState {..} addr = do let cAddress = addr (cDataUp, cDataInternal) <- newFlow cChannel <- newTVar ChannelNone + cCookie <- newTVar Nothing cSecureOutQueue <- newTQueue cMaxInFlightPackets <- newTVar 4 cReservedPackets <- newTVar 0 @@ -485,9 +487,9 @@ processIncoming gs@GlobalState {..} = do mbch <- case mbconn of Nothing -> return Nothing Just conn -> readTVar (cChannel conn) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing return $ do let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict @@ -582,11 +584,12 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor True -> do atomically $ do conn@Connection {..} <- getConnection gs addr - channel <- readTVar cChannel + oldCookie <- readTVar cCookie let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header - case received `mplus` channelCurrentCookie channel of + case received `mplus` oldCookie of Just current -> do - cookieEchoReceived gs conn mbpid current + writeTVar cCookie (Just current) + cookieEchoReceived gs conn mbpid return $ Just (conn, Just packet) Nothing -> do gLog $ show addr <> ": missing cookie set, dropping " <> show header @@ -603,7 +606,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor -> do atomically $ readTVar cChannel >>= \case ChannelCookieWait -> do - writeTVar cChannel $ ChannelCookieReceived cookie + writeTVar cChannel $ ChannelCookieReceived + writeTVar cCookie $ Just cookie writeFlow gControlFlow (NewConnection conn mbpid) return $ Just (conn, Nothing) _ -> return Nothing @@ -642,17 +646,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions -channelCurrentCookie :: ChannelState -> Maybe Cookie -channelCurrentCookie = \case - ChannelCookieReceived cookie -> Just cookie - ChannelCookieConfirmed cookie -> Just cookie - ChannelOurRequest mbcookie _ -> mbcookie - ChannelPeerRequest mbcookie _ -> mbcookie - ChannelOurAccept mbcookie _ _ -> mbcookie - _ -> Nothing - -cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM () -cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do +cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM () +cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do readTVar cChannel >>= \case ChannelNone -> newConn ChannelCookieWait -> newConn @@ -660,18 +655,18 @@ cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do _ -> return () where update = do - writeTVar cChannel $ ChannelCookieConfirmed cookieSet + writeTVar cChannel ChannelCookieConfirmed newConn = do update writeFlow gControlFlow (NewConnection conn mbpid) -generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem] -generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ] +generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem] +generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ] where - echoHeader = return $ CookieEcho <$> channelCurrentCookie ch + echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie) setHeader = case ch of - ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr - ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr + ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress + ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie @@ -754,27 +749,32 @@ processOutgoing gs@GlobalState {..} = do Just _ -> swapTVar cToAcknowledge [] return $ do - cookieHeaders <- generateCookieHeaders gs cAddress channel - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems + let onAck = sequence_ $ map (streamAccepted conn) $ + catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let plain = BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders = + let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems + in BL.concat $ + (serializeObject $ transportToObject gStorage header) + : map lazyLoadBytes content - let onAck = case catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) of - [] -> return () - xs -> sequence_ $ map (streamAccepted conn) xs + let usePlaintext = do + plain <- mkPlain <$> generateCookieHeaders conn channel + return $ Just (BL.toStrict plain, plainAckedBy) - mbs <- case (secure, mbch) of - (PlaintextOnly, _) -> return $ Just (BL.toStrict plain, plainAckedBy) - (PlaintextAllowed, Nothing) -> return $ Just (BL.toStrict plain, plainAckedBy) - (_, Just ch) -> do + let useEncryption ch = do + plain <- mkPlain <$> return [] runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err return Nothing + + mbs <- case (secure, mbch) of + (PlaintextOnly, _) -> usePlaintext + (PlaintextAllowed, Nothing) -> usePlaintext + (_, Just ch) -> useEncryption ch (EncryptedOnly, Nothing) -> return Nothing case mbs of |