diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Flow.hs | 52 | ||||
| -rw-r--r-- | src/ICE.chs | 17 | ||||
| -rw-r--r-- | src/Network.hs | 451 | ||||
| -rw-r--r-- | src/Network/Protocol.hs | 325 | 
4 files changed, 529 insertions, 316 deletions
| diff --git a/src/Flow.hs b/src/Flow.hs new file mode 100644 index 0000000..349178f --- /dev/null +++ b/src/Flow.hs @@ -0,0 +1,52 @@ +module Flow ( +    Flow, SymFlow, +    newFlow, newFlowIO, +    readFlow, writeFlow, writeFlowBulk, +    readFlowIO, writeFlowIO, + +    mapPath, +) where + +import Control.Concurrent.STM + + +data Flow r w = Flow (TMVar [r]) (TMVar [w]) +              | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') + +type SymFlow a = Flow a a + +newFlow :: STM (Flow a b, Flow b a) +newFlow = do +    x <- newEmptyTMVar +    y <- newEmptyTMVar +    return (Flow x y, Flow y x) + +newFlowIO :: IO (Flow a b, Flow b a) +newFlowIO = atomically newFlow + +readFlow :: Flow r w -> STM r +readFlow (Flow rvar _) = takeTMVar rvar >>= \case +    (x:[]) -> return x +    (x:xs) -> putTMVar rvar xs >> return x +    [] -> error "Flow: empty list" +readFlow (MappedFlow f _ up) = f <$> readFlow up + +writeFlow :: Flow r w -> w -> STM () +writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (MappedFlow _ f up) = writeFlow up . f + +writeFlowBulk :: Flow r w -> [w] -> STM () +writeFlowBulk _ [] = return () +writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs + +readFlowIO :: Flow r w -> IO r +readFlowIO path = atomically $ readFlow path + +writeFlowIO :: Flow r w -> w -> IO () +writeFlowIO path = atomically . writeFlow path + + +mapPath :: (r -> r') -> (w' -> w) -> Flow r w -> Flow r' w' +mapPath rf wf (MappedFlow rf' wf' up) = MappedFlow (rf . rf') (wf' . wf) up +mapPath rf wf up = MappedFlow rf wf up diff --git a/src/ICE.chs b/src/ICE.chs index 98584a2..d553a88 100644 --- a/src/ICE.chs +++ b/src/ICE.chs @@ -17,7 +17,6 @@ module ICE (  ) where  import Control.Arrow -import Control.Concurrent.Chan  import Control.Concurrent.MVar  import Control.Monad  import Control.Monad.Except @@ -31,6 +30,7 @@ import Data.Text (Text)  import qualified Data.Text as T  import qualified Data.Text.Encoding as T  import qualified Data.Text.Read as T +import Data.Void  import Foreign.C.String  import Foreign.C.Types @@ -39,17 +39,16 @@ import Foreign.Marshal.Array  import Foreign.Ptr  import Foreign.StablePtr +import Flow  import Storage  #include "pjproject.h"  data IceSession = IceSession      { isStrans :: PjIceStrans -    , isChan :: MVar (Either [ByteString] (MappedChan ByteString)) +    , isChan :: MVar (Either [ByteString] (Flow Void ByteString))      } -data MappedChan a = forall b. MappedChan (a -> b) (Chan b) -  instance Eq IceSession where      (==) = (==) `on` isStrans @@ -188,13 +187,13 @@ foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO ()  ice_call_cb :: StablePtr (IO ()) -> IO ()  ice_call_cb = join . deRefStablePtr -iceSetChan :: IceSession -> (ByteString -> a) -> Chan a -> IO () -iceSetChan sess f chan = do +iceSetChan :: IceSession -> Flow Void ByteString -> IO () +iceSetChan sess chan = do      modifyMVar_ (isChan sess) $ \orig -> do          case orig of -             Left buf -> writeList2Chan chan $ map f $ reverse buf +             Left buf -> mapM_ (writeFlowIO chan) $ reverse buf               Right _ -> return () -        return $ Right $ MappedChan f chan +        return $ Right chan  foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO ()  ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () @@ -202,5 +201,5 @@ ice_rx_data sptr buf len = do      sess <- deRefStablePtr sptr      bs <- packCStringLen (buf, len)      modifyMVar_ (isChan sess) $ \case -            mc@(Right (MappedChan f chan)) -> writeChan chan (f bs) >> return mc +            mc@(Right chan) -> writeFlowIO chan bs >> return mc              Left bss -> return $ Left (bs:bss) diff --git a/src/Network.hs b/src/Network.hs index da786c6..787bff9 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -8,7 +8,6 @@ module Network (      Peer, peerServer, peerStorage,      PeerAddress(..), peerAddress,      PeerIdentity(..), peerIdentity, -    PeerChannel(..),      WaitingRef, wrDigest,      Service(..),      serverPeer, serverPeerIce, @@ -18,7 +17,6 @@ module Network (      discoveryPort,  ) where -import Control.Applicative  import Control.Concurrent  import Control.Concurrent.STM  import Control.Exception @@ -45,11 +43,10 @@ import GHC.Conc.Sync (unsafeIOToSTM)  import Network.Socket  import qualified Network.Socket.ByteString as S -import System.Clock -  import Channel  import ICE  import Identity +import Network.Protocol  import PubKey  import Service  import State @@ -70,7 +67,8 @@ data Server = Server      , serverIdentity_ :: MVar UnifiedIdentity      , serverThreads :: MVar [ThreadId]      , serverSocket :: MVar Socket -    , serverChanPacket :: Chan (PeerAddress, BC.ByteString) +    , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) +    , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress      , serverDataResponse :: TQueue (Peer, Maybe PartialRef)      , serverIOActions :: TQueue (ExceptT String IO ())      , serverServices :: [SomeService] @@ -101,30 +99,29 @@ defaultServerOptions = ServerOptions  data Peer = Peer      { peerAddress :: PeerAddress      , peerServer_ :: Server +    , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress))      , peerIdentityVar :: TVar PeerIdentity -    , peerChannel :: TVar PeerChannel      , peerStorage_ :: Storage      , peerInStorage :: PartialStorage -    , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket) -    , peerSentPackets :: TVar [SentPacket]      , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) -    , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)]      , peerWaitingRefs :: TMVar [WaitingRef]      } -data SentPacket = SentPacket -    { spTime :: TimeSpec -    , spRetryCount :: Int -    , spAckedBy :: [TransportHeaderItem] -    , spData :: BC.ByteString -    } -  peerServer :: Peer -> Server  peerServer = peerServer_  peerStorage :: Peer -> Storage  peerStorage = peerStorage_ +getPeerChannel :: Peer -> STM ChannelState +getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection + +setPeerChannel :: Peer -> ChannelState -> STM () +setPeerChannel Peer {..} ch = do +    readTVar peerConnection >>= \case +        Left _ -> retry +        Right conn -> connSetChannel conn ch +  instance Eq Peer where      (==) = (==) `on` peerIdentityVar @@ -157,89 +154,21 @@ data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String                    | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])                    | PeerIdentityFull UnifiedIdentity -data PeerChannel = ChannelWait -                 | ChannelOurRequest (Stored ChannelRequest) -                 | ChannelPeerRequest WaitingRef -                 | ChannelOurAccept (Stored ChannelAccept) Channel -                 | ChannelEstablished Channel -  peerIdentity :: MonadIO m => Peer -> m PeerIdentity  peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data TransportPacket = TransportPacket TransportHeader [Ref] - - -data TransportHeaderItem -    = Acknowledged PartialRef -    | Rejected PartialRef -    | DataRequest PartialRef -    | DataResponse PartialRef -    | AnnounceSelf PartialRef -    | AnnounceUpdate PartialRef -    | TrChannelRequest PartialRef -    | TrChannelAccept PartialRef -    | ServiceType ServiceID -    | ServiceRef PartialRef -    deriving (Eq) - -data TransportHeader = TransportHeader [TransportHeaderItem] - -transportToObject :: TransportHeader -> PartialObject -transportToObject (TransportHeader items) = Rec $ map single items -    where single = \case -              Acknowledged ref -> (BC.pack "ACK", RecRef ref) -              Rejected ref -> (BC.pack "REJ", RecRef ref) -              DataRequest ref -> (BC.pack "REQ", RecRef ref) -              DataResponse ref -> (BC.pack "RSP", RecRef ref) -              AnnounceSelf ref -> (BC.pack "ANN", RecRef ref) -              AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref) -              TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref) -              TrChannelAccept ref -> (BC.pack "CAC", RecRef ref) -              ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype) -              ServiceRef ref -> (BC.pack "SRF", RecRef ref) - -transportFromObject :: PartialObject -> Maybe TransportHeader -transportFromObject (Rec items) = case catMaybes $ map single items of -                                       [] -> Nothing -                                       titems -> Just $ TransportHeader titems -    where single (name, content) = if -              | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref -              | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected ref -              | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref -              | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref -              | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref -              | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref -              | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref -              | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref -              | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid -              | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref -              | otherwise -> Nothing -transportFromObject _ = Nothing -  lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID  lookupServiceType (ServiceType stype : _) = Just stype  lookupServiceType (_ : hs) = lookupServiceType hs  lookupServiceType [] = Nothing -data WaitingRef = WaitingRef -    { wrefStorage :: Storage -    , wrefPartial :: PartialRef -    , wrefAction :: Ref -> WaitingRefCallback -    , wrefStatus :: TVar (Either [RefDigest] Ref) -    } - -type WaitingRefCallback = ExceptT String IO () - -wrDigest :: WaitingRef -> RefDigest -wrDigest = refDigest . wrefPartial - -newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef -newWaitingRef pref act = do -    peer <- gets phPeer -    wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left [])) -    modifyTMVarP (peerWaitingRefs peer) (wref:) +newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef dgst act = do +    peer@Peer {..} <- gets phPeer +    wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) +    modifyTMVarP peerWaitingRefs (wref:)      liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)      return wref @@ -254,7 +183,8 @@ startServer opt serverOrigHead logd' serverServices = do      serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead      serverThreads <- newMVar []      serverSocket <- newEmptyMVar -    serverChanPacket <- newChan +    (serverRawPath, protocolRawPath) <- newFlowIO +    (serverNewConnection, protocolNewConnection) <- newFlowIO      serverDataResponse <- newTQueueIO      serverIOActions <- newTQueueIO      serverServiceStates <- newTMVarIO M.empty @@ -289,23 +219,23 @@ startServer opt serverOrigHead logd' serverServices = do              when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do                  readMVar serverIdentity_ >>= \identity -> do                      st <- derivePartialStorage serverStorage -                    let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] +                    let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]                      mapM_ (void . S.sendTo sock packet) broadcastAddreses                  threadDelay $ announceIntervalSeconds * 1000 * 1000              let announceUpdate identity = do                      st <- derivePartialStorage serverStorage                      let selfRef = partialRef st $ storedRef $ idData identity -                        updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity) +                        updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity)                          ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]                          hitems = map AnnounceUpdate updateRefs                          packet = TransportPacket (TransportHeader $  hitems) []                      ps <- readMVar serverPeers                      forM_ ps $ \peer -> atomically $ do -                        ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case +                        ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case                              (PeerIdentityFull _, ChannelEstablished _) -> -                                writeTQueue (peerOutQueue peer) (True, ackedBy, packet) +                                sendToPeerS peer ackedBy packet                              _  -> return ()              void $ watchHead serverOrigHead $ \h -> do @@ -325,42 +255,38 @@ startServer opt serverOrigHead logd' serverServices = do              forkServerThread server $ forever $ do                  (msg, saddr) <- S.recvFrom sock 4096 -                writeChan serverChanPacket (DatagramAddress sock saddr, msg) +                writeFlowIO serverRawPath (DatagramAddress sock saddr, msg) -            forever $ do -                (paddr, msg) <- readChan serverChanPacket -                (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do -                    case M.lookup paddr pvalue of -                        Just peer -> do -                            mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case -                                ChannelEstablished ch -> Just ch -                                ChannelOurAccept _ ch -> Just ch -                                _                     -> Nothing - -                            if  | Just ch <- mbch -                                , Right plain <- runExcept $ channelDecrypt ch msg -                                -> return (pvalue, (peer, plain, True)) - -                                | otherwise -                                -> return (pvalue, (peer, msg, False)) +            forkServerThread server $ forever $ do +                (paddr, msg) <- readFlowIO serverRawPath +                case paddr of +                    DatagramAddress _ addr -> void $ S.sendTo sock msg addr +                    PeerIceSession ice     -> iceSend ice msg +            forkServerThread server $ forever $ do +                conn <- readFlowIO serverNewConnection +                let paddr = connAddress conn +                peer <- modifyMVar serverPeers $ \pvalue -> do +                    case M.lookup paddr pvalue of +                        Just peer -> return (pvalue, peer)                          Nothing -> do                              peer <- mkPeer server paddr -                            return (M.insert paddr peer pvalue, (peer, msg, False)) +                            return (M.insert paddr peer pvalue, peer) -                case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of -                     Right (obj:objs) -                         | Just header <- transportFromObject obj -> do -                               prefs <- forM objs $ storeObject $ peerInStorage peer -                               identity <- readMVar serverIdentity_ -                               let svcs = map someServiceID serverServices -                               handlePacket identity secure peer chanSvc svcs header prefs +                atomically $ do +                    readTVar (peerConnection peer) >>= \case +                        Left packets -> writeFlowBulk (connData conn) $ reverse packets +                        Right _ -> return () +                    writeTVar (peerConnection peer) (Right conn) -                         | otherwise -> atomically $ do -                               logd $ show paddr ++ ": invalid objects" -                               logd $ show objs +                forkServerThread server $ forever $ do +                    (secure, TransportPacket header objs) <- readFlowIO $ connData conn +                    prefs <- forM objs $ storeObject $ peerInStorage peer +                    identity <- readMVar serverIdentity_ +                    let svcs = map someServiceID serverServices +                    handlePacket identity secure peer chanSvc svcs header prefs -                     _ -> do atomically $ logd $ show paddr ++ ": invalid objects" +            erebosNetworkProtocol logd protocolRawPath protocolNewConnection      forkServerThread server $ withSocketsDo $ do          let hints = defaultHints @@ -383,87 +309,6 @@ stopServer :: Server -> IO ()  stopServer Server {..} = do      mapM_ killThread =<< takeMVar serverThreads -sendWorker :: Peer -> IO () -sendWorker peer = do -    startTime <- getTime MonotonicRaw -    nowVar <- newTVarIO startTime -    waitVar <- newTVarIO startTime - -    let waitTill time = forkServerThread (peerServer peer) $ do -            now <- getTime MonotonicRaw -            when (time > now) $ -                threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 -            atomically . writeTVar nowVar =<< getTime MonotonicRaw - -    let sendBytes sp = do -            when (not $ null $ spAckedBy sp) $ do -                now <- getTime MonotonicRaw -                atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp -                    { spTime = now -                    , spRetryCount = spRetryCount sp + 1 -                    } -            case peerAddress peer of -                DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr -                PeerIceSession ice        -> iceSend ice (spData sp) - -    let sendNextPacket = do -            (secure, ackedBy, packet@(TransportPacket header content)) <- -                readTQueue (peerOutQueue peer) - -            let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) -            let plain = BL.toStrict $ BL.concat $ -                    (serializeObject $ transportToObject header) -                    : map lazyLoadBytes content - -            mbch <- readTVar (peerChannel peer) >>= \case -                ChannelEstablished ch -> return (Just ch) -                _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):) -                        return Nothing - -            return $ do -                mbs <- case mbch of -                    Just ch -> do -                        runExceptT (channelEncrypt ch plain) >>= \case -                            Right ctext -> return $ Just ctext -                            Left err -> do logd $ "Failed to encrypt data: " ++ err -                                           return Nothing -                    Nothing | secure    -> return Nothing -                            | otherwise -> return $ Just plain - -                case mbs of -                    Just bs -> do -                        sendBytes $ SentPacket -                            { spTime = undefined -                            , spRetryCount = -1 -                            , spAckedBy = ackedBy -                            , spData = bs -                            } -                    Nothing -> return () - -    let retransmitPacket = do -            now <- readTVar nowVar -            (sp, rest) <- readTVar (peerSentPackets peer) >>= \case -                sps@(_:_) -> return (last sps, init sps) -                _         -> retry -            let nextTry = spTime sp + fromNanoSecs 1000000000 -            if now < nextTry -              then do -                wait <- readTVar waitVar -                if wait <= now || nextTry < wait -                   then do writeTVar waitVar nextTry -                           return $ waitTill nextTry -                   else retry -              else do -                writeTVar (peerSentPackets peer) rest -                return $ sendBytes sp - -    forever $ join $ atomically $ do -        retransmitPacket <|> sendNextPacket - -processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM () -processAcknowledgements peer = mapM_ $ \hitem -> do -    modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy -  dataResponseWorker :: Server -> IO ()  dataResponseWorker server = forever $ do      (peer, npref) <- atomically (readTQueue $ serverDataResponse server) @@ -489,7 +334,7 @@ dataResponseWorker server = forever $ do              Right _ -> return (Nothing, [])      atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list -    let reqs = concat $ map snd list +    let reqs = map refDigest $ concat $ map snd list      when (not $ null reqs) $ do          let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []              ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] @@ -540,8 +385,7 @@ handlePacket :: UnifiedIdentity -> Bool      -> TransportHeader -> [PartialRef] -> IO ()  handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do      let server = peerServer peer -    processAcknowledgements peer headers -    ochannel <- readTVar $ peerChannel peer +    ochannel <- getPeerChannel peer      let sidentity = idData identity          plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat              [ [ storedRef sidentity ] @@ -555,89 +399,89 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =      res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do          let logd = liftSTM . writeTQueue (serverErrorLog server)          forM_ headers $ \case -            Acknowledged ref -> do -                readTVarP (peerChannel peer) >>= \case -                    ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do -                        writeTVarP (peerChannel peer) $ ChannelEstablished ch -                        liftSTM $ finalizedChannel peer identity +            Acknowledged dgst -> do +                liftSTM (getPeerChannel peer) >>= \case +                    ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do +                        liftSTM $ finalizedChannel peer ch identity                      _ -> return () -            Rejected ref -> do -                logd $ "rejected by peer: " ++ show (refDigest ref) +            Rejected dgst -> do +                logd $ "rejected by peer: " ++ show dgst -            DataRequest ref -                | secure || refDigest ref `elem` plaintextRefs -> do -                    Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref -                    addHeader $ DataResponse ref -                    addAckedBy [ Acknowledged ref, Rejected ref ] +            DataRequest dgst +                | secure || dgst `elem` plaintextRefs -> do +                    Right mref <- liftSTM $ unsafeIOToSTM $ +                        copyRef (peerStorage peer) $ +                        partialRefFromDigest (peerInStorage peer) dgst +                    addHeader $ DataResponse dgst +                    addAckedBy [ Acknowledged dgst, Rejected dgst ]                      addBody $ mref                  | otherwise -> do -                    logd $ "unauthorized data request for " ++ show ref -                    addHeader $ Rejected ref +                    logd $ "unauthorized data request for " ++ show dgst +                    addHeader $ Rejected dgst -            DataResponse ref -> if -                | ref `elem` prefs -> do -                    addHeader $ Acknowledged ref -                    liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref) -                | otherwise -> throwError $ "mismatched data response " ++ show ref +            DataResponse dgst -> if +                | Just pref <- find ((==dgst) . refDigest) prefs -> do +                    addHeader $ Acknowledged dgst +                    liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) +                | otherwise -> throwError $ "mismatched data response " ++ show dgst -            AnnounceSelf pref -                | refDigest pref == refDigest (storedRef sidentity) -> return () +            AnnounceSelf dgst +                | dgst == refDigest (storedRef sidentity) -> return ()                  | otherwise -> do -                    wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer +                    wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer                      readTVarP (peerIdentityVar peer) >>= \case                          PeerIdentityUnknown idwait -> do -                            let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity -                            addHeader $ AnnounceSelf ref +                            addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity                              writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait                              liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer                          _ -> return () -            AnnounceUpdate ref -> do +            AnnounceUpdate dgst -> do                  readTVarP (peerIdentityVar peer) >>= \case                      PeerIdentityFull _ -> do -                        void $ newWaitingRef ref $ handleIdentityUpdate peer -                        addHeader $ Acknowledged ref +                        void $ newWaitingRef dgst $ handleIdentityUpdate peer +                        addHeader $ Acknowledged dgst                      _ -> return () -            TrChannelRequest reqref -> do +            TrChannelRequest dgst -> do                  let process = do -                        addHeader $ Acknowledged reqref -                        wref <- newWaitingRef reqref $ handleChannelRequest peer identity -                        writeTVarP (peerChannel peer) $ ChannelPeerRequest wref -                    reject = addHeader $ Rejected reqref - -                readTVarP (peerChannel peer) >>= \case -                    ChannelWait {} -> process -                    ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process +                        addHeader $ Acknowledged dgst +                        wref <- newWaitingRef dgst $ handleChannelRequest peer identity +                        liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref +                    reject = addHeader $ Rejected dgst + +                liftSTM (getPeerChannel peer) >>= \case +                    ChannelNone {} -> process +                    ChannelOurRequest our | dgst < refDigest (storedRef our) -> process                                            | otherwise -> reject                      ChannelPeerRequest {} -> process                      ChannelOurAccept {} -> reject                      ChannelEstablished {} -> process -            TrChannelAccept accref -> do +            TrChannelAccept dgst -> do                  let process = do -                        handleChannelAccept identity accref -                readTVarP (peerChannel peer) >>= \case -                    ChannelWait {} -> process +                        handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst +                liftSTM (getPeerChannel peer) >>= \case +                    ChannelNone {} -> process                      ChannelOurRequest {} -> process                      ChannelPeerRequest {} -> process -                    ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process -                                           | otherwise -> addHeader $ Rejected accref +                    ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process +                                           | otherwise -> addHeader $ Rejected dgst                      ChannelEstablished {} -> process              ServiceType _ -> return () -            ServiceRef pref +            ServiceRef dgst                  | not secure -> throwError $ "service packet without secure channel"                  | Just svc <- lookupServiceType headers -> if                      | svc `elem` svcs -> do -                        if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -} +                        if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}                             then do -                                addHeader $ Acknowledged pref -                                void $ newWaitingRef pref $ \ref -> +                                addHeader $ Acknowledged dgst +                                void $ newWaitingRef dgst $ \ref ->                                      liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) -                           else throwError $ "missing service object " ++ show pref -                    | otherwise -> addHeader $ Rejected pref +                           else throwError $ "missing service object " ++ show dgst +                    | otherwise -> addHeader $ Rejected dgst                  | otherwise -> throwError $ "service ref without type"      let logd = writeTQueue (serverErrorLog server) @@ -647,7 +491,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =          Right ph -> do              when (not $ null $ phHead ph) $ do                  let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) -                writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet) +                sendToPeerS' secure peer (phAckedBy ph) packet  withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () @@ -660,18 +504,17 @@ withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer  setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback  setupChannel identity peer upid = do      req <- createChannelRequest (peerStorage peer) identity upid -    let ist = peerInStorage peer -    let reqref = partialRef ist $ storedRef req +    let reqref = refDigest $ storedRef req      let hitems =              [ TrChannelRequest reqref -            , AnnounceSelf $ partialRef ist $ storedRef $ idData identity +            , AnnounceSelf $ refDigest $ storedRef $ idData identity              ]      liftIO $ atomically $ do -        readTVar (peerChannel peer) >>= \case -            ChannelWait -> do +        getPeerChannel peer >>= \case +            ChannelNone -> do                  sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $                      TransportPacket (TransportHeader hitems) [storedRef req] -                writeTVar (peerChannel peer) $ ChannelOurRequest req +                setPeerChannel peer $ ChannelOurRequest req              _ -> return ()  handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -679,10 +522,10 @@ handleChannelRequest peer identity req = do      withPeerIdentity peer $ \upid -> do          (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req)          liftIO $ atomically $ do -            readTVar (peerChannel peer) >>= \case +            getPeerChannel peer >>= \case                  ChannelPeerRequest wr | wrDigest wr == refDigest req -> do -                    writeTVar (peerChannel peer) $ ChannelOurAccept acc ch -                    let accref = (partialRef (peerInStorage peer) $ storedRef acc) +                    setPeerChannel peer $ ChannelOurAccept acc ch +                    let accref = refDigest $ storedRef acc                          header = TrChannelAccept accref                          ackedBy = [ Acknowledged accref, Rejected accref ]                      sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat @@ -702,32 +545,28 @@ handleChannelAccept identity accref = do                  Right acc -> do                      ch <- acceptedChannel identity upid (wrappedLoad acc)                      liftIO $ atomically $ do -                        sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] -                        writeTVar (peerChannel peer) $ ChannelEstablished ch -                        finalizedChannel peer identity +                        sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] +                        finalizedChannel peer ch identity                  Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> UnifiedIdentity -> STM () -finalizedChannel peer self = do -    let ist = peerInStorage peer +finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () +finalizedChannel peer@Peer {..} ch self = do +    setPeerChannel peer $ ChannelEstablished ch      -- Identity update -    do -        let selfRef = partialRef ist $ storedRef $ idData $ self -            updateRefs = selfRef : map (partialRef ist . storedRef) (idUpdates self) +    writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do +        let selfRef = refDigest $ storedRef $ idData $ self +            updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self)              ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]          sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs      -- Notify services about new peer -    readTVar (peerIdentityVar peer) >>= \case +    readTVar peerIdentityVar >>= \case          PeerIdentityFull _ -> notifyServicesOfPeer peer          _ -> return () -    -- Outstanding service packets -    mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] -  handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback  handleIdentityAnnounce self peer ref = liftIO $ atomically $ do @@ -775,22 +614,14 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do  mkPeer :: Server -> PeerAddress -> IO Peer -mkPeer server paddr = do -    pst <- deriveEphemeralStorage $ serverStorage server -    peer <- Peer -        <$> pure paddr -        <*> pure server -        <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) -        <*> newTVarIO ChannelWait -        <*> pure pst -        <*> derivePartialStorage pst -        <*> newTQueueIO -        <*> newTVarIO [] -        <*> newTMVarIO M.empty -        <*> newTVarIO [] -        <*> newTMVarIO [] -    forkServerThread server $ sendWorker peer -    return peer +mkPeer peerServer_ peerAddress = do +    peerConnection <- newTVarIO (Left []) +    peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO [] +    peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_ +    peerInStorage <- derivePartialStorage peerStorage_ +    peerServiceState <- newTMVarIO M.empty +    peerWaitingRefs <- newTMVarIO [] +    return Peer {..}  serverPeer :: Server -> SockAddr -> IO Peer  serverPeer server paddr = do @@ -798,10 +629,10 @@ serverPeer server paddr = do      serverPeer' server (DatagramAddress sock paddr)  serverPeerIce :: Server -> IceSession -> IO Peer -serverPeerIce server ice = do +serverPeerIce server@Server {..} ice = do      let paddr = PeerIceSession ice      peer <- serverPeer' server paddr -    iceSetChan ice (paddr,) $ serverChanPacket server +    iceSetChan ice $ mapPath undefined (paddr,) serverRawPath      return peer  serverPeer' :: Server -> PeerAddress -> IO Peer @@ -814,9 +645,10 @@ serverPeer' server paddr = do                   return (M.insert paddr peer pvalue, (peer, True))      when hello $ do          identity <- serverIdentity server -        atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $ -            TransportPacket -                (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) +        atomically $ do +            writeFlow (serverNewConnection server) paddr +            sendToPeerPlain peer [] $ TransportPacket +                (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ])                  []      return peer @@ -830,23 +662,28 @@ sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket  sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()  sendToPeerList peer parts = do      let st = peerStorage peer -        pst = peerInStorage peer      srefs <- liftIO $ fmap catMaybes $ forM parts $ \case          ServiceReply (Left x) use -> Just . (,use) <$> store st x          ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)          ServiceFinally act -> act >> return Nothing -    prefs <- mapM (copyRef pst . fst) srefs +    let dgsts = map (refDigest . fst) srefs      let content = map fst $ filter snd srefs -        header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) +        header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)          packet = TransportPacket header content -        ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ] +        ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]      liftIO $ atomically $ sendToPeerS peer ackedBy packet -sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet) +sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS' secure Peer {..} ackedBy packet = do +    readTVar peerConnection >>= \case +        Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs +        Right conn -> writeFlow (connData conn) (secure, packet, ackedBy) + +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS = sendToPeerS' True -sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet) +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerPlain = sendToPeerS' False  sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()  sendToPeerWith peer fobj = do diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs new file mode 100644 index 0000000..adc9471 --- /dev/null +++ b/src/Network/Protocol.hs @@ -0,0 +1,325 @@ +module Network.Protocol ( +    TransportPacket(..), +    transportToObject, +    TransportHeader(..), +    TransportHeaderItem(..), + +    WaitingRef(..), +    WaitingRefCallback, +    wrDigest, + +    ChannelState(..), + +    erebosNetworkProtocol, + +    Connection, +    connAddress, +    connData, +    connGetChannel, +    connSetChannel, + +    module Flow, +) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.Except + +import Data.ByteString (ByteString) +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL +import Data.List +import Data.Maybe + +import System.Clock + +import Channel +import Flow +import Service +import Storage + + +data TransportPacket a = TransportPacket TransportHeader [a] + +data TransportHeader = TransportHeader [TransportHeaderItem] + +data TransportHeaderItem +    = Acknowledged RefDigest +    | Rejected RefDigest +    | DataRequest RefDigest +    | DataResponse RefDigest +    | AnnounceSelf RefDigest +    | AnnounceUpdate RefDigest +    | TrChannelRequest RefDigest +    | TrChannelAccept RefDigest +    | ServiceType ServiceID +    | ServiceRef RefDigest +    deriving (Eq) + +transportToObject :: PartialStorage -> TransportHeader -> PartialObject +transportToObject st (TransportHeader items) = Rec $ map single items +    where single = \case +              Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) +              Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst) +              DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst) +              DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst) +              AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst) +              AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst) +              TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst) +              TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst) +              ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype) +              ServiceRef dgst -> (BC.pack "SRF", RecRef $ partialRefFromDigest st dgst) + +transportFromObject :: PartialObject -> Maybe TransportHeader +transportFromObject (Rec items) = case catMaybes $ map single items of +                                       [] -> Nothing +                                       titems -> Just $ TransportHeader titems +    where single (name, content) = if +              | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref +              | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref +              | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref +              | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref +              | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref +              | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref +              | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref +              | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref +              | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid +              | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref +              | otherwise -> Nothing +transportFromObject _ = Nothing + + +data GlobalState addr = (Eq addr, Show addr) => GlobalState +    { gConnections :: TVar [Connection addr] +    , gDataFlow :: SymFlow (addr, ByteString) +    , gConnectionFlow :: Flow addr (Connection addr) +    , gLog :: String -> STM () +    , gStorage :: PartialStorage +    , gNowVar :: TVar TimeSpec +    , gNextTimeout :: TVar TimeSpec +    } + +data Connection addr = Connection +    { cAddress :: addr +    , cDataUp :: Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) +    , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) +    , cChannel :: TVar ChannelState +    , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) +    , cSentPackets :: TVar [SentPacket] +    } + +connAddress :: Connection addr -> addr +connAddress = cAddress + +connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) +connData = cDataUp + +connGetChannel :: Connection addr -> STM ChannelState +connGetChannel Connection {..} = readTVar cChannel + +connSetChannel :: Connection addr -> ChannelState -> STM () +connSetChannel Connection {..} ch = do +    writeTVar cChannel ch + + +data WaitingRef = WaitingRef +    { wrefStorage :: Storage +    , wrefPartial :: PartialRef +    , wrefAction :: Ref -> WaitingRefCallback +    , wrefStatus :: TVar (Either [RefDigest] Ref) +    } + +type WaitingRefCallback = ExceptT String IO () + +wrDigest :: WaitingRef -> RefDigest +wrDigest = refDigest . wrefPartial + + +data ChannelState = ChannelNone +                  | ChannelOurRequest (Stored ChannelRequest) +                  | ChannelPeerRequest WaitingRef +                  | ChannelOurAccept (Stored ChannelAccept) Channel +                  | ChannelEstablished Channel + + +data SentPacket = SentPacket +    { spTime :: TimeSpec +    , spRetryCount :: Int +    , spAckedBy :: [TransportHeaderItem] +    , spData :: BC.ByteString +    } + + +erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) +                      => (String -> STM ()) +                      -> SymFlow (addr, ByteString) +                      -> Flow addr (Connection addr) +                      -> IO () +erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do +    gConnections <- newTVarIO [] +    gStorage <- derivePartialStorage =<< memoryStorage + +    startTime <- getTime MonotonicRaw +    gNowVar <- newTVarIO startTime +    gNextTimeout <- newTVarIO startTime + +    let gs = GlobalState {..} + +    let signalTimeouts = forever $ do +            now <- getTime MonotonicRaw +            next <- atomically $ do +                writeTVar gNowVar now +                readTVar gNextTimeout + +            let waitTill time +                    | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 +                    | otherwise = threadDelay maxBound +                waitForUpdate = atomically $ do +                    next' <- readTVar gNextTimeout +                    when (next' == next) retry + +            race_ (waitTill next) waitForUpdate + +    race_ signalTimeouts $ forever $ join $ atomically $ +        processIncomming gs <|> processOutgoing gs + + + +getConnection :: GlobalState addr -> addr -> STM (Connection addr) +getConnection GlobalState {..} addr = do +    conns <- readTVar gConnections +    case find ((addr==) . cAddress) conns of +        Just conn -> return conn +        Nothing -> do +            let cAddress = addr +            (cDataUp, cDataInternal) <- newFlow +            cChannel <- newTVar ChannelNone +            cSecureOutQueue <- newTQueue +            cSentPackets <- newTVar [] +            let conn = Connection {..} + +            writeTVar gConnections (conn : conns) +            writeFlow gConnectionFlow conn +            return conn + +processIncomming :: GlobalState addr -> STM (IO ()) +processIncomming gs@GlobalState {..} = do +    (addr, msg) <- readFlow gDataFlow +    conn@Connection {..} <- getConnection gs addr + +    mbch <- readTVar cChannel >>= return . \case +        ChannelEstablished ch -> Just ch +        ChannelOurAccept _ ch -> Just ch +        _                     -> Nothing + +    return $ do +        (content, secure) <- do +            if  | Just ch <- mbch +                -> runExceptT (channelDecrypt ch msg) >>= \case +                    Right plain -> return (plain, True) +                    _ -> return (msg, False) + +                | otherwise +                -> return (msg, False) + +        case runExcept $ deserializeObjects gStorage $ BL.fromStrict content of +            Right (obj:objs) +                | Just header@(TransportHeader items) <- transportFromObject obj -> atomically $ do +                    processAcknowledgements gs conn items +                    writeFlow cDataInternal (secure, TransportPacket header objs) + +                | otherwise -> atomically $ do +                      gLog $ show cAddress ++ ": invalid objects" +                      gLog $ show objs + +            _ -> do atomically $ gLog $ show cAddress ++ ": invalid objects" + + +processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) +processOutgoing gs@GlobalState {..} = do +    let sendBytes :: Connection addr -> SentPacket -> IO () +        sendBytes Connection {..} sp = do +            now <- getTime MonotonicRaw +            atomically $ do +                when (not $ null $ spAckedBy sp) $ do +                    modifyTVar' cSentPackets $ (:) sp +                        { spTime = now +                        , spRetryCount = spRetryCount sp + 1 +                        } +                writeFlow gDataFlow (cAddress, spData sp) + +    let sendNextPacket :: Connection addr -> STM (IO ()) +        sendNextPacket conn@Connection {..} = do +            mbch <- readTVar cChannel >>= return . \case +                ChannelEstablished ch -> Just ch +                _                     -> Nothing + +            let checkOutstanding +                    | isJust mbch = readTQueue cSecureOutQueue +                    | otherwise = retry + +            (secure, packet@(TransportPacket header content), ackedBy) <- +                checkOutstanding <|> readFlow cDataInternal + +            let plain = BL.toStrict $ BL.concat $ +                    (serializeObject $ transportToObject gStorage header) +                    : map lazyLoadBytes content + +            when (isNothing mbch && secure) $ do +                writeTQueue cSecureOutQueue (secure, packet, ackedBy) + +            return $ do +                mbs <- case mbch of +                    Just ch -> do +                        runExceptT (channelEncrypt ch plain) >>= \case +                            Right ctext -> return $ Just ctext +                            Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err +                                           return Nothing +                    Nothing | secure    -> return Nothing +                            | otherwise -> return $ Just plain + +                case mbs of +                    Just bs -> do +                        sendBytes conn $ SentPacket +                            { spTime = undefined +                            , spRetryCount = -1 +                            , spAckedBy = ackedBy +                            , spData = bs +                            } +                    Nothing -> return () + +    let retransmitPacket :: Connection addr -> STM (IO ()) +        retransmitPacket conn@Connection {..} = do +            now <- readTVar gNowVar +            (sp, rest) <- readTVar cSentPackets >>= \case +                sps@(_:_) -> return (last sps, init sps) +                _         -> retry +            let nextTry = spTime sp + fromNanoSecs 1000000000 +            if now < nextTry +              then do +                nextTimeout <- readTVar gNextTimeout +                if nextTimeout <= now || nextTry < nextTimeout +                   then do writeTVar gNextTimeout nextTry +                           return $ return () +                   else retry +              else do +                writeTVar cSentPackets rest +                return $ sendBytes conn sp + +    let establishNewConnection = do +            _ <- getConnection gs =<< readFlow gConnectionFlow +            return $ return () + +    conns <- readTVar gConnections +    msum $ concat $ +        [ map retransmitPacket conns +        , map sendNextPacket conns +        , [ establishNewConnection ] +        ] + +processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM () +processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do +    modifyTVar' cSentPackets $ filter $ (hitem `notElem`) . spAckedBy |