summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-09-16 20:32:36 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-09-16 21:43:43 +0200
commitb09df3cbdc0e1ee56a4b07e3bd7594bb3ce7fd50 (patch)
tree8cad16b9ea9c91688b38b3dd18b73b1ac97dabdf
parentcbdffb714cf24eacfe08586f56109e46df234806 (diff)
Network: acknowledgment using packet counter
-rw-r--r--src/Network.hs5
-rw-r--r--src/Network/Protocol.hs64
2 files changed, 50 insertions, 19 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 8cb2ed5..1d28d68 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -451,7 +451,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
DataResponse dgst -> if
| Just pref <- find ((==dgst) . refDigest) prefs -> do
- addHeader $ Acknowledged dgst
+ when (not secure) $ do
+ addHeader $ Acknowledged dgst
liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)
| otherwise -> throwError $ "mismatched data response " ++ show dgst
@@ -470,7 +471,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityFull _ -> do
void $ newWaitingRef dgst $ handleIdentityUpdate peer
- addHeader $ Acknowledged dgst
_ -> return ()
TrChannelRequest dgst -> do
@@ -513,7 +513,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
then do
- addHeader $ Acknowledged dgst
void $ newWaitingRef dgst $ \ref ->
liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
else throwError $ "missing service object " ++ show dgst
diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs
index 0f099e0..1240fde 100644
--- a/src/Network/Protocol.hs
+++ b/src/Network/Protocol.hs
@@ -63,6 +63,7 @@ data TransportHeader = TransportHeader [TransportHeaderItem]
data TransportHeaderItem
= Acknowledged RefDigest
+ | AcknowledgedSingle Integer
| Rejected RefDigest
| ProtocolVersion Text
| Initiation RefDigest
@@ -81,10 +82,22 @@ data TransportHeaderItem
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
+isHeaderItemAcknowledged :: TransportHeaderItem -> Bool
+isHeaderItemAcknowledged = \case
+ Acknowledged {} -> False
+ AcknowledgedSingle {} -> False
+ Rejected {} -> False
+ ProtocolVersion {} -> False
+ Initiation {} -> False
+ CookieSet {} -> False
+ CookieEcho {} -> False
+ _ -> True
+
transportToObject :: PartialStorage -> TransportHeader -> PartialObject
transportToObject st (TransportHeader items) = Rec $ map single items
where single = \case
Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst)
+ AcknowledgedSingle num -> (BC.pack "ACK", RecInt num)
Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)
ProtocolVersion ver -> (BC.pack "VER", RecText ver)
Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst)
@@ -105,6 +118,7 @@ transportFromObject (Rec items) = case catMaybes $ map single items of
titems -> Just $ TransportHeader titems
where single (name, content) = if
| name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref
+ | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num
| name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref
| name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver
| name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref
@@ -142,6 +156,7 @@ data Connection addr = Connection
, cChannel :: TVar ChannelState
, cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem])
, cSentPackets :: TVar [SentPacket]
+ , cToAcknowledge :: TVar [Integer]
}
connAddress :: Connection addr -> addr
@@ -253,6 +268,7 @@ newConnection GlobalState {..} addr = do
cChannel <- newTVar ChannelNone
cSecureOutQueue <- newTQueue
cSentPackets <- newTVar []
+ cToAcknowledge <- newTVar []
let conn = Connection {..}
writeTVar gConnections (conn : conns)
@@ -285,12 +301,12 @@ processIncoming gs@GlobalState {..} = do
Just (b, enc)
| b .&. 0xE0 == 0x80 -> do
ch <- maybe (throwError "unexpected encrypted packet") return mbch
- (dec, _) <- channelDecrypt ch enc
+ (dec, counter) <- channelDecrypt ch enc
case B.uncons dec of
Just (0x00, content) -> do
objs <- deserialize content
- return (True, objs)
+ return (True, objs, Just counter)
Just (_, _) -> do
throwError "streams not implemented"
@@ -300,18 +316,22 @@ processIncoming gs@GlobalState {..} = do
| b .&. 0xE0 == 0x60 -> do
objs <- deserialize msg
- return (False, objs)
+ return (False, objs, Nothing)
| otherwise -> throwError "invalid packet"
Nothing -> throwError "empty packet"
runExceptT parse >>= \case
- Right (secure, objs)
+ Right (secure, objs, mbcounter)
| hobj:content <- objs
, Just header@(TransportHeader items) <- transportFromObject hobj
-> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case
- Just (conn, mbup) -> atomically $ do
+ Just (conn@Connection {..}, mbup) -> atomically $ do
+ case mbcounter of
+ Just counter | any isHeaderItemAcknowledged items ->
+ modifyTVar' cToAcknowledge (fromIntegral counter :)
+ _ -> return ()
processAcknowledgements gs conn items
case mbup of
Just up -> putTMVar gNextUp (conn, (secure, up))
@@ -460,25 +480,35 @@ processOutgoing gs@GlobalState {..} = do
let sendNextPacket :: Connection addr -> STM (IO ())
sendNextPacket conn@Connection {..} = do
- mbch <- readTVar cChannel >>= return . \case
- ChannelEstablished ch -> Just ch
- _ -> Nothing
+ channel <- readTVar cChannel
+ let mbch = case channel of
+ ChannelEstablished ch -> Just ch
+ _ -> Nothing
let checkOutstanding
| isJust mbch = readTQueue cSecureOutQueue
| otherwise = retry
- (secure, packet@(TransportPacket (TransportHeader hitems) content), ackedBy) <-
- checkOutstanding <|> readFlow cDataInternal
+ checkAcknowledgements
+ | isJust mbch = do
+ acks <- readTVar cToAcknowledge
+ if null acks then retry
+ else return (True, TransportPacket (TransportHeader []) [], [])
+ | otherwise = retry
+
+ (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <-
+ checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements
when (isNothing mbch && secure) $ do
- writeTQueue cSecureOutQueue (secure, packet, ackedBy)
+ writeTQueue cSecureOutQueue (secure, packet, plainAckedBy)
- channel <- readTVar cChannel
+ acknowledge <- case mbch of
+ Nothing -> return []
+ Just _ -> swapTVar cToAcknowledge []
return $ do
cookieHeaders <- generateCookieHeaders gs cAddress channel
- let header = TransportHeader $ cookieHeaders ++ hitems
+ let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems
let plain = BL.concat $
(serializeObject $ transportToObject gStorage header)
@@ -487,14 +517,16 @@ processOutgoing gs@GlobalState {..} = do
mbs <- case mbch of
Just ch -> do
runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case
- Right (ctext, _) -> return $ Just $ 0x80 `B.cons` ctext
+ Right (ctext, counter) -> do
+ let isAcked = any isHeaderItemAcknowledged hitems
+ return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
return Nothing
Nothing | secure -> return Nothing
- | otherwise -> return $ Just $ BL.toStrict plain
+ | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy)
case mbs of
- Just bs -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ Just (bs, ackedBy) -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)
Nothing -> return ()
let retransmitPacket :: Connection addr -> STM (IO ())