diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 5 | ||||
| -rw-r--r-- | src/Network/Protocol.hs | 64 | 
2 files changed, 50 insertions, 19 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 8cb2ed5..1d28d68 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -451,7 +451,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =              DataResponse dgst -> if                  | Just pref <- find ((==dgst) . refDigest) prefs -> do -                    addHeader $ Acknowledged dgst +                    when (not secure) $ do +                        addHeader $ Acknowledged dgst                      liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)                  | otherwise -> throwError $ "mismatched data response " ++ show dgst @@ -470,7 +471,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =                  readTVarP (peerIdentityVar peer) >>= \case                      PeerIdentityFull _ -> do                          void $ newWaitingRef dgst $ handleIdentityUpdate peer -                        addHeader $ Acknowledged dgst                      _ -> return ()              TrChannelRequest dgst -> do @@ -513,7 +513,6 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =                      | svc `elem` svcs -> do                          if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}                             then do -                                addHeader $ Acknowledged dgst                                  void $ newWaitingRef dgst $ \ref ->                                      liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)                             else throwError $ "missing service object " ++ show dgst diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index 0f099e0..1240fde 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -63,6 +63,7 @@ data TransportHeader = TransportHeader [TransportHeaderItem]  data TransportHeaderItem      = Acknowledged RefDigest +    | AcknowledgedSingle Integer      | Rejected RefDigest      | ProtocolVersion Text      | Initiation RefDigest @@ -81,10 +82,22 @@ data TransportHeaderItem  newtype Cookie = Cookie ByteString      deriving (Eq, Show) +isHeaderItemAcknowledged :: TransportHeaderItem -> Bool +isHeaderItemAcknowledged = \case +    Acknowledged {} -> False +    AcknowledgedSingle {} -> False +    Rejected {} -> False +    ProtocolVersion {} -> False +    Initiation {} -> False +    CookieSet {} -> False +    CookieEcho {} -> False +    _ -> True +  transportToObject :: PartialStorage -> TransportHeader -> PartialObject  transportToObject st (TransportHeader items) = Rec $ map single items      where single = \case                Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) +              AcknowledgedSingle num -> (BC.pack "ACK", RecInt num)                Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)                ProtocolVersion ver -> (BC.pack "VER", RecText ver)                Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst) @@ -105,6 +118,7 @@ transportFromObject (Rec items) = case catMaybes $ map single items of                                         titems -> Just $ TransportHeader titems      where single (name, content) = if                | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref +              | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num                | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref                | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver                | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref @@ -142,6 +156,7 @@ data Connection addr = Connection      , cChannel :: TVar ChannelState      , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem])      , cSentPackets :: TVar [SentPacket] +    , cToAcknowledge :: TVar [Integer]      }  connAddress :: Connection addr -> addr @@ -253,6 +268,7 @@ newConnection GlobalState {..} addr = do      cChannel <- newTVar ChannelNone      cSecureOutQueue <- newTQueue      cSentPackets <- newTVar [] +    cToAcknowledge <- newTVar []      let conn = Connection {..}      writeTVar gConnections (conn : conns) @@ -285,12 +301,12 @@ processIncoming gs@GlobalState {..} = do                  Just (b, enc)                      | b .&. 0xE0 == 0x80 -> do                          ch <- maybe (throwError "unexpected encrypted packet") return mbch -                        (dec, _) <- channelDecrypt ch enc +                        (dec, counter) <- channelDecrypt ch enc                          case B.uncons dec of                              Just (0x00, content) -> do                                  objs <- deserialize content -                                return (True, objs) +                                return (True, objs, Just counter)                              Just (_, _) -> do                                  throwError "streams not implemented" @@ -300,18 +316,22 @@ processIncoming gs@GlobalState {..} = do                      | b .&. 0xE0 == 0x60 -> do                          objs <- deserialize msg -                        return (False, objs) +                        return (False, objs, Nothing)                      | otherwise -> throwError "invalid packet"                  Nothing -> throwError "empty packet"          runExceptT parse >>= \case -            Right (secure, objs) +            Right (secure, objs, mbcounter)                  | hobj:content <- objs                  , Just header@(TransportHeader items) <- transportFromObject hobj                  -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case -                    Just (conn, mbup) -> atomically $ do +                    Just (conn@Connection {..}, mbup) -> atomically $ do +                        case mbcounter of +                            Just counter | any isHeaderItemAcknowledged items -> +                                modifyTVar' cToAcknowledge (fromIntegral counter :) +                            _ -> return ()                          processAcknowledgements gs conn items                          case mbup of                              Just up -> putTMVar gNextUp (conn, (secure, up)) @@ -460,25 +480,35 @@ processOutgoing gs@GlobalState {..} = do      let sendNextPacket :: Connection addr -> STM (IO ())          sendNextPacket conn@Connection {..} = do -            mbch <- readTVar cChannel >>= return . \case -                ChannelEstablished ch -> Just ch -                _                     -> Nothing +            channel <- readTVar cChannel +            let mbch = case channel of +                    ChannelEstablished ch -> Just ch +                    _                     -> Nothing              let checkOutstanding                      | isJust mbch = readTQueue cSecureOutQueue                      | otherwise = retry -            (secure, packet@(TransportPacket (TransportHeader hitems) content), ackedBy) <- -                checkOutstanding <|> readFlow cDataInternal +                checkAcknowledgements +                    | isJust mbch = do +                        acks <- readTVar cToAcknowledge +                        if null acks then retry +                                     else return (True, TransportPacket (TransportHeader []) [], []) +                    | otherwise = retry + +            (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <- +                checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements              when (isNothing mbch && secure) $ do -                writeTQueue cSecureOutQueue (secure, packet, ackedBy) +                writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) -            channel <- readTVar cChannel +            acknowledge <- case mbch of +                Nothing -> return [] +                Just _ -> swapTVar cToAcknowledge []              return $ do                  cookieHeaders <- generateCookieHeaders gs cAddress channel -                let header = TransportHeader $ cookieHeaders ++ hitems +                let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems                  let plain = BL.concat $                          (serializeObject $ transportToObject gStorage header) @@ -487,14 +517,16 @@ processOutgoing gs@GlobalState {..} = do                  mbs <- case mbch of                      Just ch -> do                          runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case -                            Right (ctext, _) -> return $ Just $ 0x80 `B.cons` ctext +                            Right (ctext, counter) -> do +                                let isAcked = any isHeaderItemAcknowledged hitems +                                return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])                              Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err                                             return Nothing                      Nothing | secure    -> return Nothing -                            | otherwise -> return $ Just $ BL.toStrict plain +                            | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy)                  case mbs of -                    Just bs -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) +                    Just (bs, ackedBy) -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)                      Nothing -> return ()      let retransmitPacket :: Connection addr -> STM (IO ()) |