diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-01 23:01:30 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-12 23:08:55 +0200 |
commit | bda62efef1ad38779f23b38b4e1436f06fb9c7c1 (patch) | |
tree | d7efc976a63f97bea3e8877c30b291225f05ded3 | |
parent | 71cfee5086a6bf1c7a810d83fd67320bb9552197 (diff) |
Network protocol refactoring with explicit data flows
-rw-r--r-- | erebos.cabal | 3 | ||||
-rw-r--r-- | src/Flow.hs | 52 | ||||
-rw-r--r-- | src/ICE.chs | 17 | ||||
-rw-r--r-- | src/Network.hs | 451 | ||||
-rw-r--r-- | src/Network/Protocol.hs | 325 |
5 files changed, 532 insertions, 316 deletions
diff --git a/erebos.cabal b/erebos.cabal index f5d3fb9..12cb9ac 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -24,8 +24,10 @@ executable erebos Channel, Contact Discovery + Flow Message, Network, + Network.Protocol Pairing PubKey, Service @@ -65,6 +67,7 @@ executable erebos UndecidableInstances build-depends: aeson >=1.4 && <2.1, + async >=2.2 && <2.3, base >=4.13 && <4.17, binary >=0.8 && <0.11, bytestring >=0.10 && <0.12, diff --git a/src/Flow.hs b/src/Flow.hs new file mode 100644 index 0000000..349178f --- /dev/null +++ b/src/Flow.hs @@ -0,0 +1,52 @@ +module Flow ( + Flow, SymFlow, + newFlow, newFlowIO, + readFlow, writeFlow, writeFlowBulk, + readFlowIO, writeFlowIO, + + mapPath, +) where + +import Control.Concurrent.STM + + +data Flow r w = Flow (TMVar [r]) (TMVar [w]) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') + +type SymFlow a = Flow a a + +newFlow :: STM (Flow a b, Flow b a) +newFlow = do + x <- newEmptyTMVar + y <- newEmptyTMVar + return (Flow x y, Flow y x) + +newFlowIO :: IO (Flow a b, Flow b a) +newFlowIO = atomically newFlow + +readFlow :: Flow r w -> STM r +readFlow (Flow rvar _) = takeTMVar rvar >>= \case + (x:[]) -> return x + (x:xs) -> putTMVar rvar xs >> return x + [] -> error "Flow: empty list" +readFlow (MappedFlow f _ up) = f <$> readFlow up + +writeFlow :: Flow r w -> w -> STM () +writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (MappedFlow _ f up) = writeFlow up . f + +writeFlowBulk :: Flow r w -> [w] -> STM () +writeFlowBulk _ [] = return () +writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs + +readFlowIO :: Flow r w -> IO r +readFlowIO path = atomically $ readFlow path + +writeFlowIO :: Flow r w -> w -> IO () +writeFlowIO path = atomically . writeFlow path + + +mapPath :: (r -> r') -> (w' -> w) -> Flow r w -> Flow r' w' +mapPath rf wf (MappedFlow rf' wf' up) = MappedFlow (rf . rf') (wf' . wf) up +mapPath rf wf up = MappedFlow rf wf up diff --git a/src/ICE.chs b/src/ICE.chs index 98584a2..d553a88 100644 --- a/src/ICE.chs +++ b/src/ICE.chs @@ -17,7 +17,6 @@ module ICE ( ) where import Control.Arrow -import Control.Concurrent.Chan import Control.Concurrent.MVar import Control.Monad import Control.Monad.Except @@ -31,6 +30,7 @@ import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Text.Read as T +import Data.Void import Foreign.C.String import Foreign.C.Types @@ -39,17 +39,16 @@ import Foreign.Marshal.Array import Foreign.Ptr import Foreign.StablePtr +import Flow import Storage #include "pjproject.h" data IceSession = IceSession { isStrans :: PjIceStrans - , isChan :: MVar (Either [ByteString] (MappedChan ByteString)) + , isChan :: MVar (Either [ByteString] (Flow Void ByteString)) } -data MappedChan a = forall b. MappedChan (a -> b) (Chan b) - instance Eq IceSession where (==) = (==) `on` isStrans @@ -188,13 +187,13 @@ foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb = join . deRefStablePtr -iceSetChan :: IceSession -> (ByteString -> a) -> Chan a -> IO () -iceSetChan sess f chan = do +iceSetChan :: IceSession -> Flow Void ByteString -> IO () +iceSetChan sess chan = do modifyMVar_ (isChan sess) $ \orig -> do case orig of - Left buf -> writeList2Chan chan $ map f $ reverse buf + Left buf -> mapM_ (writeFlowIO chan) $ reverse buf Right _ -> return () - return $ Right $ MappedChan f chan + return $ Right chan foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () @@ -202,5 +201,5 @@ ice_rx_data sptr buf len = do sess <- deRefStablePtr sptr bs <- packCStringLen (buf, len) modifyMVar_ (isChan sess) $ \case - mc@(Right (MappedChan f chan)) -> writeChan chan (f bs) >> return mc + mc@(Right chan) -> writeFlowIO chan bs >> return mc Left bss -> return $ Left (bs:bss) diff --git a/src/Network.hs b/src/Network.hs index da786c6..787bff9 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -8,7 +8,6 @@ module Network ( Peer, peerServer, peerStorage, PeerAddress(..), peerAddress, PeerIdentity(..), peerIdentity, - PeerChannel(..), WaitingRef, wrDigest, Service(..), serverPeer, serverPeerIce, @@ -18,7 +17,6 @@ module Network ( discoveryPort, ) where -import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Exception @@ -45,11 +43,10 @@ import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket import qualified Network.Socket.ByteString as S -import System.Clock - import Channel import ICE import Identity +import Network.Protocol import PubKey import Service import State @@ -70,7 +67,8 @@ data Server = Server , serverIdentity_ :: MVar UnifiedIdentity , serverThreads :: MVar [ThreadId] , serverSocket :: MVar Socket - , serverChanPacket :: Chan (PeerAddress, BC.ByteString) + , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) + , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress , serverDataResponse :: TQueue (Peer, Maybe PartialRef) , serverIOActions :: TQueue (ExceptT String IO ()) , serverServices :: [SomeService] @@ -101,30 +99,29 @@ defaultServerOptions = ServerOptions data Peer = Peer { peerAddress :: PeerAddress , peerServer_ :: Server + , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress)) , peerIdentityVar :: TVar PeerIdentity - , peerChannel :: TVar PeerChannel , peerStorage_ :: Storage , peerInStorage :: PartialStorage - , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket) - , peerSentPackets :: TVar [SentPacket] , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) - , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)] , peerWaitingRefs :: TMVar [WaitingRef] } -data SentPacket = SentPacket - { spTime :: TimeSpec - , spRetryCount :: Int - , spAckedBy :: [TransportHeaderItem] - , spData :: BC.ByteString - } - peerServer :: Peer -> Server peerServer = peerServer_ peerStorage :: Peer -> Storage peerStorage = peerStorage_ +getPeerChannel :: Peer -> STM ChannelState +getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection + +setPeerChannel :: Peer -> ChannelState -> STM () +setPeerChannel Peer {..} ch = do + readTVar peerConnection >>= \case + Left _ -> retry + Right conn -> connSetChannel conn ch + instance Eq Peer where (==) = (==) `on` peerIdentityVar @@ -157,89 +154,21 @@ data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) | PeerIdentityFull UnifiedIdentity -data PeerChannel = ChannelWait - | ChannelOurRequest (Stored ChannelRequest) - | ChannelPeerRequest WaitingRef - | ChannelOurAccept (Stored ChannelAccept) Channel - | ChannelEstablished Channel - peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data TransportPacket = TransportPacket TransportHeader [Ref] - - -data TransportHeaderItem - = Acknowledged PartialRef - | Rejected PartialRef - | DataRequest PartialRef - | DataResponse PartialRef - | AnnounceSelf PartialRef - | AnnounceUpdate PartialRef - | TrChannelRequest PartialRef - | TrChannelAccept PartialRef - | ServiceType ServiceID - | ServiceRef PartialRef - deriving (Eq) - -data TransportHeader = TransportHeader [TransportHeaderItem] - -transportToObject :: TransportHeader -> PartialObject -transportToObject (TransportHeader items) = Rec $ map single items - where single = \case - Acknowledged ref -> (BC.pack "ACK", RecRef ref) - Rejected ref -> (BC.pack "REJ", RecRef ref) - DataRequest ref -> (BC.pack "REQ", RecRef ref) - DataResponse ref -> (BC.pack "RSP", RecRef ref) - AnnounceSelf ref -> (BC.pack "ANN", RecRef ref) - AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref) - TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref) - TrChannelAccept ref -> (BC.pack "CAC", RecRef ref) - ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype) - ServiceRef ref -> (BC.pack "SRF", RecRef ref) - -transportFromObject :: PartialObject -> Maybe TransportHeader -transportFromObject (Rec items) = case catMaybes $ map single items of - [] -> Nothing - titems -> Just $ TransportHeader titems - where single (name, content) = if - | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref - | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected ref - | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref - | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref - | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref - | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref - | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref - | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref - | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid - | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref - | otherwise -> Nothing -transportFromObject _ = Nothing - lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID lookupServiceType (ServiceType stype : _) = Just stype lookupServiceType (_ : hs) = lookupServiceType hs lookupServiceType [] = Nothing -data WaitingRef = WaitingRef - { wrefStorage :: Storage - , wrefPartial :: PartialRef - , wrefAction :: Ref -> WaitingRefCallback - , wrefStatus :: TVar (Either [RefDigest] Ref) - } - -type WaitingRefCallback = ExceptT String IO () - -wrDigest :: WaitingRef -> RefDigest -wrDigest = refDigest . wrefPartial - -newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef -newWaitingRef pref act = do - peer <- gets phPeer - wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left [])) - modifyTMVarP (peerWaitingRefs peer) (wref:) +newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef dgst act = do + peer@Peer {..} <- gets phPeer + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) + modifyTMVarP peerWaitingRefs (wref:) liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) return wref @@ -254,7 +183,8 @@ startServer opt serverOrigHead logd' serverServices = do serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead serverThreads <- newMVar [] serverSocket <- newEmptyMVar - serverChanPacket <- newChan + (serverRawPath, protocolRawPath) <- newFlowIO + (serverNewConnection, protocolNewConnection) <- newFlowIO serverDataResponse <- newTQueueIO serverIOActions <- newTQueueIO serverServiceStates <- newTMVarIO M.empty @@ -289,23 +219,23 @@ startServer opt serverOrigHead logd' serverServices = do when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do readMVar serverIdentity_ >>= \identity -> do st <- derivePartialStorage serverStorage - let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] + let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] mapM_ (void . S.sendTo sock packet) broadcastAddreses threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage let selfRef = partialRef st $ storedRef $ idData identity - updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity) + updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity) ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] hitems = map AnnounceUpdate updateRefs packet = TransportPacket (TransportHeader $ hitems) [] ps <- readMVar serverPeers forM_ ps $ \peer -> atomically $ do - ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case + ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case (PeerIdentityFull _, ChannelEstablished _) -> - writeTQueue (peerOutQueue peer) (True, ackedBy, packet) + sendToPeerS peer ackedBy packet _ -> return () void $ watchHead serverOrigHead $ \h -> do @@ -325,42 +255,38 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 - writeChan serverChanPacket (DatagramAddress sock saddr, msg) + writeFlowIO serverRawPath (DatagramAddress sock saddr, msg) - forever $ do - (paddr, msg) <- readChan serverChanPacket - (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do - case M.lookup paddr pvalue of - Just peer -> do - mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ ch -> Just ch - _ -> Nothing - - if | Just ch <- mbch - , Right plain <- runExcept $ channelDecrypt ch msg - -> return (pvalue, (peer, plain, True)) - - | otherwise - -> return (pvalue, (peer, msg, False)) + forkServerThread server $ forever $ do + (paddr, msg) <- readFlowIO serverRawPath + case paddr of + DatagramAddress _ addr -> void $ S.sendTo sock msg addr + PeerIceSession ice -> iceSend ice msg + forkServerThread server $ forever $ do + conn <- readFlowIO serverNewConnection + let paddr = connAddress conn + peer <- modifyMVar serverPeers $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> return (pvalue, peer) Nothing -> do peer <- mkPeer server paddr - return (M.insert paddr peer pvalue, (peer, msg, False)) + return (M.insert paddr peer pvalue, peer) - case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of - Right (obj:objs) - | Just header <- transportFromObject obj -> do - prefs <- forM objs $ storeObject $ peerInStorage peer - identity <- readMVar serverIdentity_ - let svcs = map someServiceID serverServices - handlePacket identity secure peer chanSvc svcs header prefs + atomically $ do + readTVar (peerConnection peer) >>= \case + Left packets -> writeFlowBulk (connData conn) $ reverse packets + Right _ -> return () + writeTVar (peerConnection peer) (Right conn) - | otherwise -> atomically $ do - logd $ show paddr ++ ": invalid objects" - logd $ show objs + forkServerThread server $ forever $ do + (secure, TransportPacket header objs) <- readFlowIO $ connData conn + prefs <- forM objs $ storeObject $ peerInStorage peer + identity <- readMVar serverIdentity_ + let svcs = map someServiceID serverServices + handlePacket identity secure peer chanSvc svcs header prefs - _ -> do atomically $ logd $ show paddr ++ ": invalid objects" + erebosNetworkProtocol logd protocolRawPath protocolNewConnection forkServerThread server $ withSocketsDo $ do let hints = defaultHints @@ -383,87 +309,6 @@ stopServer :: Server -> IO () stopServer Server {..} = do mapM_ killThread =<< takeMVar serverThreads -sendWorker :: Peer -> IO () -sendWorker peer = do - startTime <- getTime MonotonicRaw - nowVar <- newTVarIO startTime - waitVar <- newTVarIO startTime - - let waitTill time = forkServerThread (peerServer peer) $ do - now <- getTime MonotonicRaw - when (time > now) $ - threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 - atomically . writeTVar nowVar =<< getTime MonotonicRaw - - let sendBytes sp = do - when (not $ null $ spAckedBy sp) $ do - now <- getTime MonotonicRaw - atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp - { spTime = now - , spRetryCount = spRetryCount sp + 1 - } - case peerAddress peer of - DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr - PeerIceSession ice -> iceSend ice (spData sp) - - let sendNextPacket = do - (secure, ackedBy, packet@(TransportPacket header content)) <- - readTQueue (peerOutQueue peer) - - let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) - let plain = BL.toStrict $ BL.concat $ - (serializeObject $ transportToObject header) - : map lazyLoadBytes content - - mbch <- readTVar (peerChannel peer) >>= \case - ChannelEstablished ch -> return (Just ch) - _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):) - return Nothing - - return $ do - mbs <- case mbch of - Just ch -> do - runExceptT (channelEncrypt ch plain) >>= \case - Right ctext -> return $ Just ctext - Left err -> do logd $ "Failed to encrypt data: " ++ err - return Nothing - Nothing | secure -> return Nothing - | otherwise -> return $ Just plain - - case mbs of - Just bs -> do - sendBytes $ SentPacket - { spTime = undefined - , spRetryCount = -1 - , spAckedBy = ackedBy - , spData = bs - } - Nothing -> return () - - let retransmitPacket = do - now <- readTVar nowVar - (sp, rest) <- readTVar (peerSentPackets peer) >>= \case - sps@(_:_) -> return (last sps, init sps) - _ -> retry - let nextTry = spTime sp + fromNanoSecs 1000000000 - if now < nextTry - then do - wait <- readTVar waitVar - if wait <= now || nextTry < wait - then do writeTVar waitVar nextTry - return $ waitTill nextTry - else retry - else do - writeTVar (peerSentPackets peer) rest - return $ sendBytes sp - - forever $ join $ atomically $ do - retransmitPacket <|> sendNextPacket - -processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM () -processAcknowledgements peer = mapM_ $ \hitem -> do - modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy - dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do (peer, npref) <- atomically (readTQueue $ serverDataResponse server) @@ -489,7 +334,7 @@ dataResponseWorker server = forever $ do Right _ -> return (Nothing, []) atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list - let reqs = concat $ map snd list + let reqs = map refDigest $ concat $ map snd list when (not $ null reqs) $ do let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] @@ -540,8 +385,7 @@ handlePacket :: UnifiedIdentity -> Bool -> TransportHeader -> [PartialRef] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer - processAcknowledgements peer headers - ochannel <- readTVar $ peerChannel peer + ochannel <- getPeerChannel peer let sidentity = idData identity plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat [ [ storedRef sidentity ] @@ -555,89 +399,89 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do let logd = liftSTM . writeTQueue (serverErrorLog server) forM_ headers $ \case - Acknowledged ref -> do - readTVarP (peerChannel peer) >>= \case - ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do - writeTVarP (peerChannel peer) $ ChannelEstablished ch - liftSTM $ finalizedChannel peer identity + Acknowledged dgst -> do + liftSTM (getPeerChannel peer) >>= \case + ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do + liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected ref -> do - logd $ "rejected by peer: " ++ show (refDigest ref) + Rejected dgst -> do + logd $ "rejected by peer: " ++ show dgst - DataRequest ref - | secure || refDigest ref `elem` plaintextRefs -> do - Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref - addHeader $ DataResponse ref - addAckedBy [ Acknowledged ref, Rejected ref ] + DataRequest dgst + | secure || dgst `elem` plaintextRefs -> do + Right mref <- liftSTM $ unsafeIOToSTM $ + copyRef (peerStorage peer) $ + partialRefFromDigest (peerInStorage peer) dgst + addHeader $ DataResponse dgst + addAckedBy [ Acknowledged dgst, Rejected dgst ] addBody $ mref | otherwise -> do - logd $ "unauthorized data request for " ++ show ref - addHeader $ Rejected ref + logd $ "unauthorized data request for " ++ show dgst + addHeader $ Rejected dgst - DataResponse ref -> if - | ref `elem` prefs -> do - addHeader $ Acknowledged ref - liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref) - | otherwise -> throwError $ "mismatched data response " ++ show ref + DataResponse dgst -> if + | Just pref <- find ((==dgst) . refDigest) prefs -> do + addHeader $ Acknowledged dgst + liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) + | otherwise -> throwError $ "mismatched data response " ++ show dgst - AnnounceSelf pref - | refDigest pref == refDigest (storedRef sidentity) -> return () + AnnounceSelf dgst + | dgst == refDigest (storedRef sidentity) -> return () | otherwise -> do - wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer + wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer readTVarP (peerIdentityVar peer) >>= \case PeerIdentityUnknown idwait -> do - let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity - addHeader $ AnnounceSelf ref + addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer _ -> return () - AnnounceUpdate ref -> do + AnnounceUpdate dgst -> do readTVarP (peerIdentityVar peer) >>= \case PeerIdentityFull _ -> do - void $ newWaitingRef ref $ handleIdentityUpdate peer - addHeader $ Acknowledged ref + void $ newWaitingRef dgst $ handleIdentityUpdate peer + addHeader $ Acknowledged dgst _ -> return () - TrChannelRequest reqref -> do + TrChannelRequest dgst -> do let process = do - addHeader $ Acknowledged reqref - wref <- newWaitingRef reqref $ handleChannelRequest peer identity - writeTVarP (peerChannel peer) $ ChannelPeerRequest wref - reject = addHeader $ Rejected reqref - - readTVarP (peerChannel peer) >>= \case - ChannelWait {} -> process - ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process + addHeader $ Acknowledged dgst + wref <- newWaitingRef dgst $ handleChannelRequest peer identity + liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref + reject = addHeader $ Rejected dgst + + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> process + ChannelOurRequest our | dgst < refDigest (storedRef our) -> process | otherwise -> reject ChannelPeerRequest {} -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process - TrChannelAccept accref -> do + TrChannelAccept dgst -> do let process = do - handleChannelAccept identity accref - readTVarP (peerChannel peer) >>= \case - ChannelWait {} -> process + handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> process ChannelOurRequest {} -> process ChannelPeerRequest {} -> process - ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process - | otherwise -> addHeader $ Rejected accref + ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process + | otherwise -> addHeader $ Rejected dgst ChannelEstablished {} -> process ServiceType _ -> return () - ServiceRef pref + ServiceRef dgst | not secure -> throwError $ "service packet without secure channel" | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do - if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -} + if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} then do - addHeader $ Acknowledged pref - void $ newWaitingRef pref $ \ref -> + addHeader $ Acknowledged dgst + void $ newWaitingRef dgst $ \ref -> liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show pref - | otherwise -> addHeader $ Rejected pref + else throwError $ "missing service object " ++ show dgst + | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" let logd = writeTQueue (serverErrorLog server) @@ -647,7 +491,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = Right ph -> do when (not $ null $ phHead ph) $ do let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) - writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet) + sendToPeerS' secure peer (phAckedBy ph) packet withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () @@ -660,18 +504,17 @@ withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback setupChannel identity peer upid = do req <- createChannelRequest (peerStorage peer) identity upid - let ist = peerInStorage peer - let reqref = partialRef ist $ storedRef req + let reqref = refDigest $ storedRef req let hitems = [ TrChannelRequest reqref - , AnnounceSelf $ partialRef ist $ storedRef $ idData identity + , AnnounceSelf $ refDigest $ storedRef $ idData identity ] liftIO $ atomically $ do - readTVar (peerChannel peer) >>= \case - ChannelWait -> do + getPeerChannel peer >>= \case + ChannelNone -> do sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ TransportPacket (TransportHeader hitems) [storedRef req] - writeTVar (peerChannel peer) $ ChannelOurRequest req + setPeerChannel peer $ ChannelOurRequest req _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -679,10 +522,10 @@ handleChannelRequest peer identity req = do withPeerIdentity peer $ \upid -> do (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req) liftIO $ atomically $ do - readTVar (peerChannel peer) >>= \case + getPeerChannel peer >>= \case ChannelPeerRequest wr | wrDigest wr == refDigest req -> do - writeTVar (peerChannel peer) $ ChannelOurAccept acc ch - let accref = (partialRef (peerInStorage peer) $ storedRef acc) + setPeerChannel peer $ ChannelOurAccept acc ch + let accref = refDigest $ storedRef acc header = TrChannelAccept accref ackedBy = [ Acknowledged accref, Rejected accref ] sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat @@ -702,32 +545,28 @@ handleChannelAccept identity accref = do Right acc -> do ch <- acceptedChannel identity upid (wrappedLoad acc) liftIO $ atomically $ do - sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] - writeTVar (peerChannel peer) $ ChannelEstablished ch - finalizedChannel peer identity + sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] + finalizedChannel peer ch identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> UnifiedIdentity -> STM () -finalizedChannel peer self = do - let ist = peerInStorage peer +finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () +finalizedChannel peer@Peer {..} ch self = do + setPeerChannel peer $ ChannelEstablished ch -- Identity update - do - let selfRef = partialRef ist $ storedRef $ idData $ self - updateRefs = selfRef : map (partialRef ist . storedRef) (idUpdates self) + writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do + let selfRef = refDigest $ storedRef $ idData $ self + updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self) ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs -- Notify services about new peer - readTVar (peerIdentityVar peer) >>= \case + readTVar peerIdentityVar >>= \case PeerIdentityFull _ -> notifyServicesOfPeer peer _ -> return () - -- Outstanding service packets - mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] - handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback handleIdentityAnnounce self peer ref = liftIO $ atomically $ do @@ -775,22 +614,14 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do mkPeer :: Server -> PeerAddress -> IO Peer -mkPeer server paddr = do - pst <- deriveEphemeralStorage $ serverStorage server - peer <- Peer - <$> pure paddr - <*> pure server - <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) - <*> newTVarIO ChannelWait - <*> pure pst - <*> derivePartialStorage pst - <*> newTQueueIO - <*> newTVarIO [] - <*> newTMVarIO M.empty - <*> newTVarIO [] - <*> newTMVarIO [] - forkServerThread server $ sendWorker peer - return peer +mkPeer peerServer_ peerAddress = do + peerConnection <- newTVarIO (Left []) + peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO [] + peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_ + peerInStorage <- derivePartialStorage peerStorage_ + peerServiceState <- newTMVarIO M.empty + peerWaitingRefs <- newTMVarIO [] + return Peer {..} serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do @@ -798,10 +629,10 @@ serverPeer server paddr = do serverPeer' server (DatagramAddress sock paddr) serverPeerIce :: Server -> IceSession -> IO Peer -serverPeerIce server ice = do +serverPeerIce server@Server {..} ice = do let paddr = PeerIceSession ice peer <- serverPeer' server paddr - iceSetChan ice (paddr,) $ serverChanPacket server + iceSetChan ice $ mapPath undefined (paddr,) serverRawPath return peer serverPeer' :: Server -> PeerAddress -> IO Peer @@ -814,9 +645,10 @@ serverPeer' server paddr = do return (M.insert paddr peer pvalue, (peer, True)) when hello $ do identity <- serverIdentity server - atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $ - TransportPacket - (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) + atomically $ do + writeFlow (serverNewConnection server) paddr + sendToPeerPlain peer [] $ TransportPacket + (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]) [] return peer @@ -830,23 +662,28 @@ sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do let st = peerStorage peer - pst = peerInStorage peer srefs <- liftIO $ fmap catMaybes $ forM parts $ \case ServiceReply (Left x) use -> Just . (,use) <$> store st x ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) ServiceFinally act -> act >> return Nothing - prefs <- mapM (copyRef pst . fst) srefs + let dgsts = map (refDigest . fst) srefs let content = map fst $ filter snd srefs - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) + header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ] + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] liftIO $ atomically $ sendToPeerS peer ackedBy packet -sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet) +sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS' secure Peer {..} ackedBy packet = do + readTVar peerConnection >>= \case + Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs + Right conn -> writeFlow (connData conn) (secure, packet, ackedBy) + +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS = sendToPeerS' True -sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet) +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerPlain = sendToPeerS' False sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs new file mode 100644 index 0000000..adc9471 --- /dev/null +++ b/src/Network/Protocol.hs @@ -0,0 +1,325 @@ +module Network.Protocol ( + TransportPacket(..), + transportToObject, + TransportHeader(..), + TransportHeaderItem(..), + + WaitingRef(..), + WaitingRefCallback, + wrDigest, + + ChannelState(..), + + erebosNetworkProtocol, + + Connection, + connAddress, + connData, + connGetChannel, + connSetChannel, + + module Flow, +) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.Except + +import Data.ByteString (ByteString) +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL +import Data.List +import Data.Maybe + +import System.Clock + +import Channel +import Flow +import Service +import Storage + + +data TransportPacket a = TransportPacket TransportHeader [a] + +data TransportHeader = TransportHeader [TransportHeaderItem] + +data TransportHeaderItem + = Acknowledged RefDigest + | Rejected RefDigest + | DataRequest RefDigest + | DataResponse RefDigest + | AnnounceSelf RefDigest + | AnnounceUpdate RefDigest + | TrChannelRequest RefDigest + | TrChannelAccept RefDigest + | ServiceType ServiceID + | ServiceRef RefDigest + deriving (Eq) + +transportToObject :: PartialStorage -> TransportHeader -> PartialObject +transportToObject st (TransportHeader items) = Rec $ map single items + where single = \case + Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) + Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst) + DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst) + DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst) + AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst) + AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst) + TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst) + TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst) + ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype) + ServiceRef dgst -> (BC.pack "SRF", RecRef $ partialRefFromDigest st dgst) + +transportFromObject :: PartialObject -> Maybe TransportHeader +transportFromObject (Rec items) = case catMaybes $ map single items of + [] -> Nothing + titems -> Just $ TransportHeader titems + where single (name, content) = if + | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref + | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref + | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref + | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref + | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref + | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref + | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref + | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref + | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid + | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref + | otherwise -> Nothing +transportFromObject _ = Nothing + + +data GlobalState addr = (Eq addr, Show addr) => GlobalState + { gConnections :: TVar [Connection addr] + , gDataFlow :: SymFlow (addr, ByteString) + , gConnectionFlow :: Flow addr (Connection addr) + , gLog :: String -> STM () + , gStorage :: PartialStorage + , gNowVar :: TVar TimeSpec + , gNextTimeout :: TVar TimeSpec + } + +data Connection addr = Connection + { cAddress :: addr + , cDataUp :: Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) + , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) + , cChannel :: TVar ChannelState + , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) + , cSentPackets :: TVar [SentPacket] + } + +connAddress :: Connection addr -> addr +connAddress = cAddress + +connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) +connData = cDataUp + +connGetChannel :: Connection addr -> STM ChannelState +connGetChannel Connection {..} = readTVar cChannel + +connSetChannel :: Connection addr -> ChannelState -> STM () +connSetChannel Connection {..} ch = do + writeTVar cChannel ch + + +data WaitingRef = WaitingRef + { wrefStorage :: Storage + , wrefPartial :: PartialRef + , wrefAction :: Ref -> WaitingRefCallback + , wrefStatus :: TVar (Either [RefDigest] Ref) + } + +type WaitingRefCallback = ExceptT String IO () + +wrDigest :: WaitingRef -> RefDigest +wrDigest = refDigest . wrefPartial + + +data ChannelState = ChannelNone + | ChannelOurRequest (Stored ChannelRequest) + | ChannelPeerRequest WaitingRef + | ChannelOurAccept (Stored ChannelAccept) Channel + | ChannelEstablished Channel + + +data SentPacket = SentPacket + { spTime :: TimeSpec + , spRetryCount :: Int + , spAckedBy :: [TransportHeaderItem] + , spData :: BC.ByteString + } + + +erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) + => (String -> STM ()) + -> SymFlow (addr, ByteString) + -> Flow addr (Connection addr) + -> IO () +erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do + gConnections <- newTVarIO [] + gStorage <- derivePartialStorage =<< memoryStorage + + startTime <- getTime MonotonicRaw + gNowVar <- newTVarIO startTime + gNextTimeout <- newTVarIO startTime + + let gs = GlobalState {..} + + let signalTimeouts = forever $ do + now <- getTime MonotonicRaw + next <- atomically $ do + writeTVar gNowVar now + readTVar gNextTimeout + + let waitTill time + | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 + | otherwise = threadDelay maxBound + waitForUpdate = atomically $ do + next' <- readTVar gNextTimeout + when (next' == next) retry + + race_ (waitTill next) waitForUpdate + + race_ signalTimeouts $ forever $ join $ atomically $ + processIncomming gs <|> processOutgoing gs + + + +getConnection :: GlobalState addr -> addr -> STM (Connection addr) +getConnection GlobalState {..} addr = do + conns <- readTVar gConnections + case find ((addr==) . cAddress) conns of + Just conn -> return conn + Nothing -> do + let cAddress = addr + (cDataUp, cDataInternal) <- newFlow + cChannel <- newTVar ChannelNone + cSecureOutQueue <- newTQueue + cSentPackets <- newTVar [] + let conn = Connection {..} + + writeTVar gConnections (conn : conns) + writeFlow gConnectionFlow conn + return conn + +processIncomming :: GlobalState addr -> STM (IO ()) +processIncomming gs@GlobalState {..} = do + (addr, msg) <- readFlow gDataFlow + conn@Connection {..} <- getConnection gs addr + + mbch <- readTVar cChannel >>= return . \case + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing + + return $ do + (content, secure) <- do + if | Just ch <- mbch + -> runExceptT (channelDecrypt ch msg) >>= \case + Right plain -> return (plain, True) + _ -> return (msg, False) + + | otherwise + -> return (msg, False) + + case runExcept $ deserializeObjects gStorage $ BL.fromStrict content of + Right (obj:objs) + | Just header@(TransportHeader items) <- transportFromObject obj -> atomically $ do + processAcknowledgements gs conn items + writeFlow cDataInternal (secure, TransportPacket header objs) + + | otherwise -> atomically $ do + gLog $ show cAddress ++ ": invalid objects" + gLog $ show objs + + _ -> do atomically $ gLog $ show cAddress ++ ": invalid objects" + + +processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) +processOutgoing gs@GlobalState {..} = do + let sendBytes :: Connection addr -> SentPacket -> IO () + sendBytes Connection {..} sp = do + now <- getTime MonotonicRaw + atomically $ do + when (not $ null $ spAckedBy sp) $ do + modifyTVar' cSentPackets $ (:) sp + { spTime = now + , spRetryCount = spRetryCount sp + 1 + } + writeFlow gDataFlow (cAddress, spData sp) + + let sendNextPacket :: Connection addr -> STM (IO ()) + sendNextPacket conn@Connection {..} = do + mbch <- readTVar cChannel >>= return . \case + ChannelEstablished ch -> Just ch + _ -> Nothing + + let checkOutstanding + | isJust mbch = readTQueue cSecureOutQueue + | otherwise = retry + + (secure, packet@(TransportPacket header content), ackedBy) <- + checkOutstanding <|> readFlow cDataInternal + + let plain = BL.toStrict $ BL.concat $ + (serializeObject $ transportToObject gStorage header) + : map lazyLoadBytes content + + when (isNothing mbch && secure) $ do + writeTQueue cSecureOutQueue (secure, packet, ackedBy) + + return $ do + mbs <- case mbch of + Just ch -> do + runExceptT (channelEncrypt ch plain) >>= \case + Right ctext -> return $ Just ctext + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + return Nothing + Nothing | secure -> return Nothing + | otherwise -> return $ Just plain + + case mbs of + Just bs -> do + sendBytes conn $ SentPacket + { spTime = undefined + , spRetryCount = -1 + , spAckedBy = ackedBy + , spData = bs + } + Nothing -> return () + + let retransmitPacket :: Connection addr -> STM (IO ()) + retransmitPacket conn@Connection {..} = do + now <- readTVar gNowVar + (sp, rest) <- readTVar cSentPackets >>= \case + sps@(_:_) -> return (last sps, init sps) + _ -> retry + let nextTry = spTime sp + fromNanoSecs 1000000000 + if now < nextTry + then do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || nextTry < nextTimeout + then do writeTVar gNextTimeout nextTry + return $ return () + else retry + else do + writeTVar cSentPackets rest + return $ sendBytes conn sp + + let establishNewConnection = do + _ <- getConnection gs =<< readFlow gConnectionFlow + return $ return () + + conns <- readTVar gConnections + msum $ concat $ + [ map retransmitPacket conns + , map sendNextPacket conns + , [ establishNewConnection ] + ] + +processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM () +processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do + modifyTVar' cSentPackets $ filter $ (hitem `notElem`) . spAckedBy |