From bda62efef1ad38779f23b38b4e1436f06fb9c7c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Tue, 1 Aug 2023 23:01:30 +0200 Subject: Network protocol refactoring with explicit data flows --- src/Network.hs | 451 ++++++++++++++++++--------------------------------------- 1 file changed, 144 insertions(+), 307 deletions(-) (limited to 'src/Network.hs') diff --git a/src/Network.hs b/src/Network.hs index da786c6..787bff9 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -8,7 +8,6 @@ module Network ( Peer, peerServer, peerStorage, PeerAddress(..), peerAddress, PeerIdentity(..), peerIdentity, - PeerChannel(..), WaitingRef, wrDigest, Service(..), serverPeer, serverPeerIce, @@ -18,7 +17,6 @@ module Network ( discoveryPort, ) where -import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Exception @@ -45,11 +43,10 @@ import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket import qualified Network.Socket.ByteString as S -import System.Clock - import Channel import ICE import Identity +import Network.Protocol import PubKey import Service import State @@ -70,7 +67,8 @@ data Server = Server , serverIdentity_ :: MVar UnifiedIdentity , serverThreads :: MVar [ThreadId] , serverSocket :: MVar Socket - , serverChanPacket :: Chan (PeerAddress, BC.ByteString) + , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) + , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress , serverDataResponse :: TQueue (Peer, Maybe PartialRef) , serverIOActions :: TQueue (ExceptT String IO ()) , serverServices :: [SomeService] @@ -101,30 +99,29 @@ defaultServerOptions = ServerOptions data Peer = Peer { peerAddress :: PeerAddress , peerServer_ :: Server + , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress)) , peerIdentityVar :: TVar PeerIdentity - , peerChannel :: TVar PeerChannel , peerStorage_ :: Storage , peerInStorage :: PartialStorage - , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket) - , peerSentPackets :: TVar [SentPacket] , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) - , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)] , peerWaitingRefs :: TMVar [WaitingRef] } -data SentPacket = SentPacket - { spTime :: TimeSpec - , spRetryCount :: Int - , spAckedBy :: [TransportHeaderItem] - , spData :: BC.ByteString - } - peerServer :: Peer -> Server peerServer = peerServer_ peerStorage :: Peer -> Storage peerStorage = peerStorage_ +getPeerChannel :: Peer -> STM ChannelState +getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection + +setPeerChannel :: Peer -> ChannelState -> STM () +setPeerChannel Peer {..} ch = do + readTVar peerConnection >>= \case + Left _ -> retry + Right conn -> connSetChannel conn ch + instance Eq Peer where (==) = (==) `on` peerIdentityVar @@ -157,89 +154,21 @@ data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) | PeerIdentityFull UnifiedIdentity -data PeerChannel = ChannelWait - | ChannelOurRequest (Stored ChannelRequest) - | ChannelPeerRequest WaitingRef - | ChannelOurAccept (Stored ChannelAccept) Channel - | ChannelEstablished Channel - peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data TransportPacket = TransportPacket TransportHeader [Ref] - - -data TransportHeaderItem - = Acknowledged PartialRef - | Rejected PartialRef - | DataRequest PartialRef - | DataResponse PartialRef - | AnnounceSelf PartialRef - | AnnounceUpdate PartialRef - | TrChannelRequest PartialRef - | TrChannelAccept PartialRef - | ServiceType ServiceID - | ServiceRef PartialRef - deriving (Eq) - -data TransportHeader = TransportHeader [TransportHeaderItem] - -transportToObject :: TransportHeader -> PartialObject -transportToObject (TransportHeader items) = Rec $ map single items - where single = \case - Acknowledged ref -> (BC.pack "ACK", RecRef ref) - Rejected ref -> (BC.pack "REJ", RecRef ref) - DataRequest ref -> (BC.pack "REQ", RecRef ref) - DataResponse ref -> (BC.pack "RSP", RecRef ref) - AnnounceSelf ref -> (BC.pack "ANN", RecRef ref) - AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref) - TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref) - TrChannelAccept ref -> (BC.pack "CAC", RecRef ref) - ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype) - ServiceRef ref -> (BC.pack "SRF", RecRef ref) - -transportFromObject :: PartialObject -> Maybe TransportHeader -transportFromObject (Rec items) = case catMaybes $ map single items of - [] -> Nothing - titems -> Just $ TransportHeader titems - where single (name, content) = if - | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref - | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected ref - | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref - | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref - | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref - | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref - | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref - | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref - | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid - | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref - | otherwise -> Nothing -transportFromObject _ = Nothing - lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID lookupServiceType (ServiceType stype : _) = Just stype lookupServiceType (_ : hs) = lookupServiceType hs lookupServiceType [] = Nothing -data WaitingRef = WaitingRef - { wrefStorage :: Storage - , wrefPartial :: PartialRef - , wrefAction :: Ref -> WaitingRefCallback - , wrefStatus :: TVar (Either [RefDigest] Ref) - } - -type WaitingRefCallback = ExceptT String IO () - -wrDigest :: WaitingRef -> RefDigest -wrDigest = refDigest . wrefPartial - -newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef -newWaitingRef pref act = do - peer <- gets phPeer - wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left [])) - modifyTMVarP (peerWaitingRefs peer) (wref:) +newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef dgst act = do + peer@Peer {..} <- gets phPeer + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) + modifyTMVarP peerWaitingRefs (wref:) liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) return wref @@ -254,7 +183,8 @@ startServer opt serverOrigHead logd' serverServices = do serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead serverThreads <- newMVar [] serverSocket <- newEmptyMVar - serverChanPacket <- newChan + (serverRawPath, protocolRawPath) <- newFlowIO + (serverNewConnection, protocolNewConnection) <- newFlowIO serverDataResponse <- newTQueueIO serverIOActions <- newTQueueIO serverServiceStates <- newTMVarIO M.empty @@ -289,23 +219,23 @@ startServer opt serverOrigHead logd' serverServices = do when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do readMVar serverIdentity_ >>= \identity -> do st <- derivePartialStorage serverStorage - let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] + let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] mapM_ (void . S.sendTo sock packet) broadcastAddreses threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage let selfRef = partialRef st $ storedRef $ idData identity - updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity) + updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity) ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] hitems = map AnnounceUpdate updateRefs packet = TransportPacket (TransportHeader $ hitems) [] ps <- readMVar serverPeers forM_ ps $ \peer -> atomically $ do - ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case + ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case (PeerIdentityFull _, ChannelEstablished _) -> - writeTQueue (peerOutQueue peer) (True, ackedBy, packet) + sendToPeerS peer ackedBy packet _ -> return () void $ watchHead serverOrigHead $ \h -> do @@ -325,42 +255,38 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 - writeChan serverChanPacket (DatagramAddress sock saddr, msg) + writeFlowIO serverRawPath (DatagramAddress sock saddr, msg) - forever $ do - (paddr, msg) <- readChan serverChanPacket - (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do - case M.lookup paddr pvalue of - Just peer -> do - mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ ch -> Just ch - _ -> Nothing - - if | Just ch <- mbch - , Right plain <- runExcept $ channelDecrypt ch msg - -> return (pvalue, (peer, plain, True)) - - | otherwise - -> return (pvalue, (peer, msg, False)) + forkServerThread server $ forever $ do + (paddr, msg) <- readFlowIO serverRawPath + case paddr of + DatagramAddress _ addr -> void $ S.sendTo sock msg addr + PeerIceSession ice -> iceSend ice msg + forkServerThread server $ forever $ do + conn <- readFlowIO serverNewConnection + let paddr = connAddress conn + peer <- modifyMVar serverPeers $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> return (pvalue, peer) Nothing -> do peer <- mkPeer server paddr - return (M.insert paddr peer pvalue, (peer, msg, False)) + return (M.insert paddr peer pvalue, peer) - case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of - Right (obj:objs) - | Just header <- transportFromObject obj -> do - prefs <- forM objs $ storeObject $ peerInStorage peer - identity <- readMVar serverIdentity_ - let svcs = map someServiceID serverServices - handlePacket identity secure peer chanSvc svcs header prefs + atomically $ do + readTVar (peerConnection peer) >>= \case + Left packets -> writeFlowBulk (connData conn) $ reverse packets + Right _ -> return () + writeTVar (peerConnection peer) (Right conn) - | otherwise -> atomically $ do - logd $ show paddr ++ ": invalid objects" - logd $ show objs + forkServerThread server $ forever $ do + (secure, TransportPacket header objs) <- readFlowIO $ connData conn + prefs <- forM objs $ storeObject $ peerInStorage peer + identity <- readMVar serverIdentity_ + let svcs = map someServiceID serverServices + handlePacket identity secure peer chanSvc svcs header prefs - _ -> do atomically $ logd $ show paddr ++ ": invalid objects" + erebosNetworkProtocol logd protocolRawPath protocolNewConnection forkServerThread server $ withSocketsDo $ do let hints = defaultHints @@ -383,87 +309,6 @@ stopServer :: Server -> IO () stopServer Server {..} = do mapM_ killThread =<< takeMVar serverThreads -sendWorker :: Peer -> IO () -sendWorker peer = do - startTime <- getTime MonotonicRaw - nowVar <- newTVarIO startTime - waitVar <- newTVarIO startTime - - let waitTill time = forkServerThread (peerServer peer) $ do - now <- getTime MonotonicRaw - when (time > now) $ - threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 - atomically . writeTVar nowVar =<< getTime MonotonicRaw - - let sendBytes sp = do - when (not $ null $ spAckedBy sp) $ do - now <- getTime MonotonicRaw - atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp - { spTime = now - , spRetryCount = spRetryCount sp + 1 - } - case peerAddress peer of - DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr - PeerIceSession ice -> iceSend ice (spData sp) - - let sendNextPacket = do - (secure, ackedBy, packet@(TransportPacket header content)) <- - readTQueue (peerOutQueue peer) - - let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) - let plain = BL.toStrict $ BL.concat $ - (serializeObject $ transportToObject header) - : map lazyLoadBytes content - - mbch <- readTVar (peerChannel peer) >>= \case - ChannelEstablished ch -> return (Just ch) - _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):) - return Nothing - - return $ do - mbs <- case mbch of - Just ch -> do - runExceptT (channelEncrypt ch plain) >>= \case - Right ctext -> return $ Just ctext - Left err -> do logd $ "Failed to encrypt data: " ++ err - return Nothing - Nothing | secure -> return Nothing - | otherwise -> return $ Just plain - - case mbs of - Just bs -> do - sendBytes $ SentPacket - { spTime = undefined - , spRetryCount = -1 - , spAckedBy = ackedBy - , spData = bs - } - Nothing -> return () - - let retransmitPacket = do - now <- readTVar nowVar - (sp, rest) <- readTVar (peerSentPackets peer) >>= \case - sps@(_:_) -> return (last sps, init sps) - _ -> retry - let nextTry = spTime sp + fromNanoSecs 1000000000 - if now < nextTry - then do - wait <- readTVar waitVar - if wait <= now || nextTry < wait - then do writeTVar waitVar nextTry - return $ waitTill nextTry - else retry - else do - writeTVar (peerSentPackets peer) rest - return $ sendBytes sp - - forever $ join $ atomically $ do - retransmitPacket <|> sendNextPacket - -processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM () -processAcknowledgements peer = mapM_ $ \hitem -> do - modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy - dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do (peer, npref) <- atomically (readTQueue $ serverDataResponse server) @@ -489,7 +334,7 @@ dataResponseWorker server = forever $ do Right _ -> return (Nothing, []) atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list - let reqs = concat $ map snd list + let reqs = map refDigest $ concat $ map snd list when (not $ null reqs) $ do let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] @@ -540,8 +385,7 @@ handlePacket :: UnifiedIdentity -> Bool -> TransportHeader -> [PartialRef] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer - processAcknowledgements peer headers - ochannel <- readTVar $ peerChannel peer + ochannel <- getPeerChannel peer let sidentity = idData identity plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat [ [ storedRef sidentity ] @@ -555,89 +399,89 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do let logd = liftSTM . writeTQueue (serverErrorLog server) forM_ headers $ \case - Acknowledged ref -> do - readTVarP (peerChannel peer) >>= \case - ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do - writeTVarP (peerChannel peer) $ ChannelEstablished ch - liftSTM $ finalizedChannel peer identity + Acknowledged dgst -> do + liftSTM (getPeerChannel peer) >>= \case + ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do + liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected ref -> do - logd $ "rejected by peer: " ++ show (refDigest ref) + Rejected dgst -> do + logd $ "rejected by peer: " ++ show dgst - DataRequest ref - | secure || refDigest ref `elem` plaintextRefs -> do - Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref - addHeader $ DataResponse ref - addAckedBy [ Acknowledged ref, Rejected ref ] + DataRequest dgst + | secure || dgst `elem` plaintextRefs -> do + Right mref <- liftSTM $ unsafeIOToSTM $ + copyRef (peerStorage peer) $ + partialRefFromDigest (peerInStorage peer) dgst + addHeader $ DataResponse dgst + addAckedBy [ Acknowledged dgst, Rejected dgst ] addBody $ mref | otherwise -> do - logd $ "unauthorized data request for " ++ show ref - addHeader $ Rejected ref + logd $ "unauthorized data request for " ++ show dgst + addHeader $ Rejected dgst - DataResponse ref -> if - | ref `elem` prefs -> do - addHeader $ Acknowledged ref - liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref) - | otherwise -> throwError $ "mismatched data response " ++ show ref + DataResponse dgst -> if + | Just pref <- find ((==dgst) . refDigest) prefs -> do + addHeader $ Acknowledged dgst + liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) + | otherwise -> throwError $ "mismatched data response " ++ show dgst - AnnounceSelf pref - | refDigest pref == refDigest (storedRef sidentity) -> return () + AnnounceSelf dgst + | dgst == refDigest (storedRef sidentity) -> return () | otherwise -> do - wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer + wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer readTVarP (peerIdentityVar peer) >>= \case PeerIdentityUnknown idwait -> do - let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity - addHeader $ AnnounceSelf ref + addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer _ -> return () - AnnounceUpdate ref -> do + AnnounceUpdate dgst -> do readTVarP (peerIdentityVar peer) >>= \case PeerIdentityFull _ -> do - void $ newWaitingRef ref $ handleIdentityUpdate peer - addHeader $ Acknowledged ref + void $ newWaitingRef dgst $ handleIdentityUpdate peer + addHeader $ Acknowledged dgst _ -> return () - TrChannelRequest reqref -> do + TrChannelRequest dgst -> do let process = do - addHeader $ Acknowledged reqref - wref <- newWaitingRef reqref $ handleChannelRequest peer identity - writeTVarP (peerChannel peer) $ ChannelPeerRequest wref - reject = addHeader $ Rejected reqref - - readTVarP (peerChannel peer) >>= \case - ChannelWait {} -> process - ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process + addHeader $ Acknowledged dgst + wref <- newWaitingRef dgst $ handleChannelRequest peer identity + liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref + reject = addHeader $ Rejected dgst + + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> process + ChannelOurRequest our | dgst < refDigest (storedRef our) -> process | otherwise -> reject ChannelPeerRequest {} -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process - TrChannelAccept accref -> do + TrChannelAccept dgst -> do let process = do - handleChannelAccept identity accref - readTVarP (peerChannel peer) >>= \case - ChannelWait {} -> process + handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> process ChannelOurRequest {} -> process ChannelPeerRequest {} -> process - ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process - | otherwise -> addHeader $ Rejected accref + ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process + | otherwise -> addHeader $ Rejected dgst ChannelEstablished {} -> process ServiceType _ -> return () - ServiceRef pref + ServiceRef dgst | not secure -> throwError $ "service packet without secure channel" | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do - if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -} + if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} then do - addHeader $ Acknowledged pref - void $ newWaitingRef pref $ \ref -> + addHeader $ Acknowledged dgst + void $ newWaitingRef dgst $ \ref -> liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show pref - | otherwise -> addHeader $ Rejected pref + else throwError $ "missing service object " ++ show dgst + | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" let logd = writeTQueue (serverErrorLog server) @@ -647,7 +491,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = Right ph -> do when (not $ null $ phHead ph) $ do let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) - writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet) + sendToPeerS' secure peer (phAckedBy ph) packet withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () @@ -660,18 +504,17 @@ withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback setupChannel identity peer upid = do req <- createChannelRequest (peerStorage peer) identity upid - let ist = peerInStorage peer - let reqref = partialRef ist $ storedRef req + let reqref = refDigest $ storedRef req let hitems = [ TrChannelRequest reqref - , AnnounceSelf $ partialRef ist $ storedRef $ idData identity + , AnnounceSelf $ refDigest $ storedRef $ idData identity ] liftIO $ atomically $ do - readTVar (peerChannel peer) >>= \case - ChannelWait -> do + getPeerChannel peer >>= \case + ChannelNone -> do sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ TransportPacket (TransportHeader hitems) [storedRef req] - writeTVar (peerChannel peer) $ ChannelOurRequest req + setPeerChannel peer $ ChannelOurRequest req _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -679,10 +522,10 @@ handleChannelRequest peer identity req = do withPeerIdentity peer $ \upid -> do (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req) liftIO $ atomically $ do - readTVar (peerChannel peer) >>= \case + getPeerChannel peer >>= \case ChannelPeerRequest wr | wrDigest wr == refDigest req -> do - writeTVar (peerChannel peer) $ ChannelOurAccept acc ch - let accref = (partialRef (peerInStorage peer) $ storedRef acc) + setPeerChannel peer $ ChannelOurAccept acc ch + let accref = refDigest $ storedRef acc header = TrChannelAccept accref ackedBy = [ Acknowledged accref, Rejected accref ] sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat @@ -702,32 +545,28 @@ handleChannelAccept identity accref = do Right acc -> do ch <- acceptedChannel identity upid (wrappedLoad acc) liftIO $ atomically $ do - sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] - writeTVar (peerChannel peer) $ ChannelEstablished ch - finalizedChannel peer identity + sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] + finalizedChannel peer ch identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> UnifiedIdentity -> STM () -finalizedChannel peer self = do - let ist = peerInStorage peer +finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () +finalizedChannel peer@Peer {..} ch self = do + setPeerChannel peer $ ChannelEstablished ch -- Identity update - do - let selfRef = partialRef ist $ storedRef $ idData $ self - updateRefs = selfRef : map (partialRef ist . storedRef) (idUpdates self) + writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do + let selfRef = refDigest $ storedRef $ idData $ self + updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self) ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs -- Notify services about new peer - readTVar (peerIdentityVar peer) >>= \case + readTVar peerIdentityVar >>= \case PeerIdentityFull _ -> notifyServicesOfPeer peer _ -> return () - -- Outstanding service packets - mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] - handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback handleIdentityAnnounce self peer ref = liftIO $ atomically $ do @@ -775,22 +614,14 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do mkPeer :: Server -> PeerAddress -> IO Peer -mkPeer server paddr = do - pst <- deriveEphemeralStorage $ serverStorage server - peer <- Peer - <$> pure paddr - <*> pure server - <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) - <*> newTVarIO ChannelWait - <*> pure pst - <*> derivePartialStorage pst - <*> newTQueueIO - <*> newTVarIO [] - <*> newTMVarIO M.empty - <*> newTVarIO [] - <*> newTMVarIO [] - forkServerThread server $ sendWorker peer - return peer +mkPeer peerServer_ peerAddress = do + peerConnection <- newTVarIO (Left []) + peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO [] + peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_ + peerInStorage <- derivePartialStorage peerStorage_ + peerServiceState <- newTMVarIO M.empty + peerWaitingRefs <- newTMVarIO [] + return Peer {..} serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do @@ -798,10 +629,10 @@ serverPeer server paddr = do serverPeer' server (DatagramAddress sock paddr) serverPeerIce :: Server -> IceSession -> IO Peer -serverPeerIce server ice = do +serverPeerIce server@Server {..} ice = do let paddr = PeerIceSession ice peer <- serverPeer' server paddr - iceSetChan ice (paddr,) $ serverChanPacket server + iceSetChan ice $ mapPath undefined (paddr,) serverRawPath return peer serverPeer' :: Server -> PeerAddress -> IO Peer @@ -814,9 +645,10 @@ serverPeer' server paddr = do return (M.insert paddr peer pvalue, (peer, True)) when hello $ do identity <- serverIdentity server - atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $ - TransportPacket - (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) + atomically $ do + writeFlow (serverNewConnection server) paddr + sendToPeerPlain peer [] $ TransportPacket + (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]) [] return peer @@ -830,23 +662,28 @@ sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do let st = peerStorage peer - pst = peerInStorage peer srefs <- liftIO $ fmap catMaybes $ forM parts $ \case ServiceReply (Left x) use -> Just . (,use) <$> store st x ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) ServiceFinally act -> act >> return Nothing - prefs <- mapM (copyRef pst . fst) srefs + let dgsts = map (refDigest . fst) srefs let content = map fst $ filter snd srefs - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) + header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ] + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] liftIO $ atomically $ sendToPeerS peer ackedBy packet -sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet) +sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS' secure Peer {..} ackedBy packet = do + readTVar peerConnection >>= \case + Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs + Right conn -> writeFlow (connData conn) (secure, packet, ackedBy) + +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS = sendToPeerS' True -sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () -sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet) +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerPlain = sendToPeerS' False sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do -- cgit v1.2.3