From b09df3cbdc0e1ee56a4b07e3bd7594bb3ce7fd50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 16 Sep 2023 20:32:36 +0200 Subject: Network: acknowledgment using packet counter --- src/Network.hs | 5 ++-- src/Network/Protocol.hs | 64 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/Network.hs b/src/Network.hs index 8cb2ed5..1d28d68 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -451,7 +451,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = DataResponse dgst -> if | Just pref <- find ((==dgst) . refDigest) prefs -> do - addHeader $ Acknowledged dgst + when (not secure) $ do + addHeader $ Acknowledged dgst liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) | otherwise -> throwError $ "mismatched data response " ++ show dgst @@ -470,7 +471,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = readTVarP (peerIdentityVar peer) >>= \case PeerIdentityFull _ -> do void $ newWaitingRef dgst $ handleIdentityUpdate peer - addHeader $ Acknowledged dgst _ -> return () TrChannelRequest dgst -> do @@ -513,7 +513,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} then do - addHeader $ Acknowledged dgst void $ newWaitingRef dgst $ \ref -> liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) else throwError $ "missing service object " ++ show dgst diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index 0f099e0..1240fde 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -63,6 +63,7 @@ data TransportHeader = TransportHeader [TransportHeaderItem] data TransportHeaderItem = Acknowledged RefDigest + | AcknowledgedSingle Integer | Rejected RefDigest | ProtocolVersion Text | Initiation RefDigest @@ -81,10 +82,22 @@ data TransportHeaderItem newtype Cookie = Cookie ByteString deriving (Eq, Show) +isHeaderItemAcknowledged :: TransportHeaderItem -> Bool +isHeaderItemAcknowledged = \case + Acknowledged {} -> False + AcknowledgedSingle {} -> False + Rejected {} -> False + ProtocolVersion {} -> False + Initiation {} -> False + CookieSet {} -> False + CookieEcho {} -> False + _ -> True + transportToObject :: PartialStorage -> TransportHeader -> PartialObject transportToObject st (TransportHeader items) = Rec $ map single items where single = \case Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) + AcknowledgedSingle num -> (BC.pack "ACK", RecInt num) Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst) ProtocolVersion ver -> (BC.pack "VER", RecText ver) Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst) @@ -105,6 +118,7 @@ transportFromObject (Rec items) = case catMaybes $ map single items of titems -> Just $ TransportHeader titems where single (name, content) = if | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref + | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref @@ -142,6 +156,7 @@ data Connection addr = Connection , cChannel :: TVar ChannelState , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) , cSentPackets :: TVar [SentPacket] + , cToAcknowledge :: TVar [Integer] } connAddress :: Connection addr -> addr @@ -253,6 +268,7 @@ newConnection GlobalState {..} addr = do cChannel <- newTVar ChannelNone cSecureOutQueue <- newTQueue cSentPackets <- newTVar [] + cToAcknowledge <- newTVar [] let conn = Connection {..} writeTVar gConnections (conn : conns) @@ -285,12 +301,12 @@ processIncoming gs@GlobalState {..} = do Just (b, enc) | b .&. 0xE0 == 0x80 -> do ch <- maybe (throwError "unexpected encrypted packet") return mbch - (dec, _) <- channelDecrypt ch enc + (dec, counter) <- channelDecrypt ch enc case B.uncons dec of Just (0x00, content) -> do objs <- deserialize content - return (True, objs) + return (True, objs, Just counter) Just (_, _) -> do throwError "streams not implemented" @@ -300,18 +316,22 @@ processIncoming gs@GlobalState {..} = do | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg - return (False, objs) + return (False, objs, Nothing) | otherwise -> throwError "invalid packet" Nothing -> throwError "empty packet" runExceptT parse >>= \case - Right (secure, objs) + Right (secure, objs, mbcounter) | hobj:content <- objs , Just header@(TransportHeader items) <- transportFromObject hobj -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case - Just (conn, mbup) -> atomically $ do + Just (conn@Connection {..}, mbup) -> atomically $ do + case mbcounter of + Just counter | any isHeaderItemAcknowledged items -> + modifyTVar' cToAcknowledge (fromIntegral counter :) + _ -> return () processAcknowledgements gs conn items case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) @@ -460,25 +480,35 @@ processOutgoing gs@GlobalState {..} = do let sendNextPacket :: Connection addr -> STM (IO ()) sendNextPacket conn@Connection {..} = do - mbch <- readTVar cChannel >>= return . \case - ChannelEstablished ch -> Just ch - _ -> Nothing + channel <- readTVar cChannel + let mbch = case channel of + ChannelEstablished ch -> Just ch + _ -> Nothing let checkOutstanding | isJust mbch = readTQueue cSecureOutQueue | otherwise = retry - (secure, packet@(TransportPacket (TransportHeader hitems) content), ackedBy) <- - checkOutstanding <|> readFlow cDataInternal + checkAcknowledgements + | isJust mbch = do + acks <- readTVar cToAcknowledge + if null acks then retry + else return (True, TransportPacket (TransportHeader []) [], []) + | otherwise = retry + + (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <- + checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements when (isNothing mbch && secure) $ do - writeTQueue cSecureOutQueue (secure, packet, ackedBy) + writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) - channel <- readTVar cChannel + acknowledge <- case mbch of + Nothing -> return [] + Just _ -> swapTVar cToAcknowledge [] return $ do cookieHeaders <- generateCookieHeaders gs cAddress channel - let header = TransportHeader $ cookieHeaders ++ hitems + let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems let plain = BL.concat $ (serializeObject $ transportToObject gStorage header) @@ -487,14 +517,16 @@ processOutgoing gs@GlobalState {..} = do mbs <- case mbch of Just ch -> do runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case - Right (ctext, _) -> return $ Just $ 0x80 `B.cons` ctext + Right (ctext, counter) -> do + let isAcked = any isHeaderItemAcknowledged hitems + return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err return Nothing Nothing | secure -> return Nothing - | otherwise -> return $ Just $ BL.toStrict plain + | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy) case mbs of - Just bs -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) + Just (bs, ackedBy) -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) Nothing -> return () let retransmitPacket :: Connection addr -> STM (IO ()) -- cgit v1.2.3