From db575cad9b8a3c3c2ab9f1a71ac2ea442c761df2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 25 May 2024 10:40:03 +0200 Subject: Handle peer reconnection after its restart Allow the handshake and channel request to proceed correcty even with already established channel to handle the case where peer attempts to reconnect after restart. Changelog: Handle peer reconnection after its restart --- src/Erebos/Network.hs | 36 ++++++++--------- src/Erebos/Network/Protocol.hs | 88 +++++++++++++++++++++--------------------- test/network.test | 49 +++++++++++++++++++++++ 3 files changed, 111 insertions(+), 62 deletions(-) diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index cc111e2..a01bdd1 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -473,9 +473,9 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = [ [ storedRef sidentity ] , map storedRef $ idUpdates identity , case ochannel of - ChannelOurRequest _ req -> [ storedRef req ] - ChannelOurAccept _ acc _ -> [ storedRef acc ] - _ -> [] + ChannelOurRequest req -> [ storedRef req ] + ChannelOurAccept acc _ -> [ storedRef acc ] + _ -> [] ] runPacketHandler secure peer $ do @@ -483,7 +483,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = forM_ headers $ \case Acknowledged dgst -> do liftSTM (getPeerChannel peer) >>= \case - ChannelOurAccept _ acc ch | refDigest (storedRef acc) == dgst -> do + ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do liftSTM $ finalizedChannel peer ch identity _ -> return () @@ -552,22 +552,22 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = _ -> return () TrChannelRequest dgst -> do - let process cookie = do + let process = do addHeader $ Acknowledged dgst wref <- newWaitingRef dgst $ handleChannelRequest peer identity - liftSTM $ setPeerChannel peer $ ChannelPeerRequest cookie wref + liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref reject = addHeader $ Rejected dgst liftSTM (getPeerChannel peer) >>= \case ChannelNone {} -> return () ChannelCookieWait {} -> return () - ChannelCookieReceived cookie -> process $ Just cookie - ChannelCookieConfirmed cookie -> process $ Just cookie - ChannelOurRequest mbcookie our | dgst < refDigest (storedRef our) -> process mbcookie - | otherwise -> reject - ChannelPeerRequest mbcookie _ -> process mbcookie + ChannelCookieReceived {} -> process + ChannelCookieConfirmed {} -> process + ChannelOurRequest our | dgst < refDigest (storedRef our) -> process + | otherwise -> reject + ChannelPeerRequest {} -> process ChannelOurAccept {} -> reject - ChannelEstablished {} -> process Nothing + ChannelEstablished {} -> process TrChannelAccept dgst -> do let process = do @@ -580,8 +580,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieConfirmed {} -> reject ChannelOurRequest {} -> process ChannelPeerRequest {} -> process - ChannelOurAccept _ our _ | dgst < refDigest (storedRef our) -> process - | otherwise -> addHeader $ Rejected dgst + ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process + | otherwise -> addHeader $ Rejected dgst ChannelEstablished {} -> process ServiceType _ -> return () @@ -617,10 +617,10 @@ setupChannel identity peer upid = do ] liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed cookie -> do + ChannelCookieConfirmed -> do sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest (Just cookie) req + setPeerChannel peer $ ChannelOurRequest req _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -629,8 +629,8 @@ handleChannelRequest peer identity req = do (acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req) liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelPeerRequest mbcookie wr | wrDigest wr == refDigest req -> do - setPeerChannel peer $ ChannelOurAccept mbcookie acc ch + ChannelPeerRequest wr | wrDigest wr == refDigest req -> do + setPeerChannel peer $ ChannelOurAccept acc ch let accref = refDigest $ storedRef acc header = TrChannelAccept accref ackedBy = [ Acknowledged accref, Rejected accref ] diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 27e05ba..a669988 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -174,6 +174,7 @@ data Connection addr = Connection , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) , cChannel :: TVar ChannelState + , cCookie :: TVar (Maybe Cookie) , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cMaxInFlightPackets :: TVar Int , cReservedPackets :: TVar Int @@ -240,9 +241,9 @@ connAddWriteStream conn@Connection {..} = do mbReserved = Just reserved mbch <- atomically (readTVar cChannel) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing mbs <- case mbch of Just ch -> do @@ -373,11 +374,11 @@ wrDigest = refDigest . wrefPartial data ChannelState = ChannelNone | ChannelCookieWait -- sent initiation, waiting for response - | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet - | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side - | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest) - | ChannelPeerRequest (Maybe Cookie) WaitingRef - | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel + | ChannelCookieReceived -- received cookie, but no cookie echo yet + | ChannelCookieConfirmed -- received cookie echo, no need to send from our side + | ChannelOurRequest (Stored ChannelRequest) + | ChannelPeerRequest WaitingRef + | ChannelOurAccept (Stored ChannelAccept) Channel | ChannelEstablished Channel data ReservedToSend = ReservedToSend @@ -456,6 +457,7 @@ newConnection cGlobalState@GlobalState {..} addr = do let cAddress = addr (cDataUp, cDataInternal) <- newFlow cChannel <- newTVar ChannelNone + cCookie <- newTVar Nothing cSecureOutQueue <- newTQueue cMaxInFlightPackets <- newTVar 4 cReservedPackets <- newTVar 0 @@ -485,9 +487,9 @@ processIncoming gs@GlobalState {..} = do mbch <- case mbconn of Nothing -> return Nothing Just conn -> readTVar (cChannel conn) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing return $ do let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict @@ -582,11 +584,12 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor True -> do atomically $ do conn@Connection {..} <- getConnection gs addr - channel <- readTVar cChannel + oldCookie <- readTVar cCookie let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header - case received `mplus` channelCurrentCookie channel of + case received `mplus` oldCookie of Just current -> do - cookieEchoReceived gs conn mbpid current + writeTVar cCookie (Just current) + cookieEchoReceived gs conn mbpid return $ Just (conn, Just packet) Nothing -> do gLog $ show addr <> ": missing cookie set, dropping " <> show header @@ -603,7 +606,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor -> do atomically $ readTVar cChannel >>= \case ChannelCookieWait -> do - writeTVar cChannel $ ChannelCookieReceived cookie + writeTVar cChannel $ ChannelCookieReceived + writeTVar cCookie $ Just cookie writeFlow gControlFlow (NewConnection conn mbpid) return $ Just (conn, Nothing) _ -> return Nothing @@ -642,17 +646,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions -channelCurrentCookie :: ChannelState -> Maybe Cookie -channelCurrentCookie = \case - ChannelCookieReceived cookie -> Just cookie - ChannelCookieConfirmed cookie -> Just cookie - ChannelOurRequest mbcookie _ -> mbcookie - ChannelPeerRequest mbcookie _ -> mbcookie - ChannelOurAccept mbcookie _ _ -> mbcookie - _ -> Nothing - -cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM () -cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do +cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM () +cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do readTVar cChannel >>= \case ChannelNone -> newConn ChannelCookieWait -> newConn @@ -660,18 +655,18 @@ cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do _ -> return () where update = do - writeTVar cChannel $ ChannelCookieConfirmed cookieSet + writeTVar cChannel ChannelCookieConfirmed newConn = do update writeFlow gControlFlow (NewConnection conn mbpid) -generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem] -generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ] +generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem] +generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ] where - echoHeader = return $ CookieEcho <$> channelCurrentCookie ch + echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie) setHeader = case ch of - ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr - ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr + ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress + ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie @@ -754,27 +749,32 @@ processOutgoing gs@GlobalState {..} = do Just _ -> swapTVar cToAcknowledge [] return $ do - cookieHeaders <- generateCookieHeaders gs cAddress channel - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems + let onAck = sequence_ $ map (streamAccepted conn) $ + catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let plain = BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders = + let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems + in BL.concat $ + (serializeObject $ transportToObject gStorage header) + : map lazyLoadBytes content - let onAck = case catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) of - [] -> return () - xs -> sequence_ $ map (streamAccepted conn) xs + let usePlaintext = do + plain <- mkPlain <$> generateCookieHeaders conn channel + return $ Just (BL.toStrict plain, plainAckedBy) - mbs <- case (secure, mbch) of - (PlaintextOnly, _) -> return $ Just (BL.toStrict plain, plainAckedBy) - (PlaintextAllowed, Nothing) -> return $ Just (BL.toStrict plain, plainAckedBy) - (_, Just ch) -> do + let useEncryption ch = do + plain <- mkPlain <$> return [] runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err return Nothing + + mbs <- case (secure, mbch) of + (PlaintextOnly, _) -> usePlaintext + (PlaintextAllowed, Nothing) -> usePlaintext + (_, Just ch) -> useEncryption ch (EncryptedOnly, Nothing) -> return Nothing case mbs of diff --git a/test/network.test b/test/network.test index 93f9743..ea57a77 100644 --- a/test/network.test +++ b/test/network.test @@ -176,3 +176,52 @@ test ManyStreams: send "test-message-send 1 $ref" expect /test-message-send done/ expect /test-message-received blob 100[2-4] $ref/ from p2 + + +test Reconnection: + spawn as p1 + with p1: + send "create-identity Device1" + send "start-server" + + node n + local: + spawn as p2 on n + send "create-identity Device2" to p2 + send "start-server" to p2 + + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "message1" + send "" + expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + + send "test-message-send 1 $message" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $message/ from p2 + + local: + spawn as p2 on n + send "start-server" to p2 + send "peer-add ${p1.node.ip}" to p2 + + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "message2" + send "" + expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + + send "test-message-send 1 $message" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $message/ from p2 -- cgit v1.2.3