From c51983c7d268d9501f3ff3e0e50f5b0293c6d788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 22 Jan 2022 21:16:40 +0100 Subject: Network: retransmission of lost packets --- src/Network.hs | 210 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 146 insertions(+), 64 deletions(-) (limited to 'src') diff --git a/src/Network.hs b/src/Network.hs index e78ae7c..01caef7 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -17,6 +17,7 @@ module Network ( discoveryPort, ) where +import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Exception @@ -42,6 +43,8 @@ import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket import qualified Network.Socket.ByteString as S +import System.Clock + import Channel import ICE import Identity @@ -66,7 +69,6 @@ data Server = Server , serverIdentity_ :: MVar UnifiedIdentity , serverSocket :: MVar Socket , serverChanPacket :: Chan (PeerAddress, BC.ByteString) - , serverOutQueue :: TQueue (Peer, Bool, TransportPacket) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) , serverIOActions :: TQueue (ExceptT String IO ()) , serverServices :: [SomeService] @@ -101,11 +103,20 @@ data Peer = Peer , peerChannel :: TVar PeerChannel , peerStorage :: Storage , peerInStorage :: PartialStorage + , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket) + , peerSentPackets :: TVar [SentPacket] , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) - , peerServiceOutQueue :: TVar [TransportPacket] + , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)] , peerWaitingRefs :: TMVar [WaitingRef] } +data SentPacket = SentPacket + { spTime :: TimeSpec + , spRetryCount :: Int + , spAckedBy :: [TransportHeaderItem] + , spData :: BC.ByteString + } + peerServer :: Peer -> Server peerServer = peerServer_ @@ -226,7 +237,6 @@ startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeSer startServer opt origHead logd' services = do let storage = refStorage $ headRef origHead chanPacket <- newChan - outQueue <- newTQueueIO dataResponse <- newTQueueIO ioActions <- newTQueueIO chanPeer <- newTChanIO @@ -244,7 +254,6 @@ startServer opt origHead logd' services = do , serverIdentity_ = midentity , serverSocket = ssocket , serverChanPacket = chanPacket - , serverOutQueue = outQueue , serverDataResponse = dataResponse , serverIOActions = ioActions , serverServices = services @@ -258,7 +267,6 @@ startServer opt origHead logd' services = do void $ forkIO $ forever $ do logd' =<< atomically (readTQueue errlog) - void $ forkIO $ sendWorker server void $ forkIO $ dataResponseWorker server void $ forkIO $ forever $ do either (atomically . logd) return =<< runExceptT =<< @@ -285,25 +293,27 @@ startServer opt origHead logd' services = do let announceUpdate identity = do st <- derivePartialStorage storage - let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) : - map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity) - let packet = TransportPacket (TransportHeader hitems) [] + let selfRef = partialRef st $ storedRef $ idData identity + updateRefs = map (partialRef st . storedRef) $ idUpdates identity + hitems = AnnounceSelf selfRef : map AnnounceUpdate updateRefs + packet = TransportPacket (TransportHeader $ hitems) [] ps <- readMVar peers forM_ ps $ \peer -> atomically $ do ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case (PeerIdentityFull _, ChannelEstablished _) -> - writeTQueue outQueue (peer, True, packet) + writeTQueue (peerOutQueue peer) (True, [], packet) _ -> return () let shareState self shared peer = do - let hitems = (ServiceType $ serviceID @SyncService Proxy) : - map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared + let refs = map (partialRef (peerInStorage peer) . storedRef) shared + hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ] packet = TransportPacket (TransportHeader hitems) $ map storedRef shared atomically $ readTVar (peerIdentityVar peer) >>= \case PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do - sendToPeerS peer packet + sendToPeerS peer ackedBy packet _ -> return () void $ watchHead origHead $ \h -> do @@ -373,36 +383,86 @@ startServer opt origHead logd' services = do return server -sendWorker :: Server -> IO () -sendWorker server = forever $ do - (peer, secure, packet@(TransportPacket header content)) <- - atomically (readTQueue $ serverOutQueue server) - - let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) - let plain = BL.toStrict $ BL.concat $ - (serializeObject $ transportToObject header) - : map lazyLoadBytes content - mbs <- do - mbch <- atomically $ do - readTVar (peerChannel peer) >>= \case +sendWorker :: Peer -> IO () +sendWorker peer = do + startTime <- getTime MonotonicRaw + nowVar <- newTVarIO startTime + waitVar <- newTVarIO startTime + + let waitTill time = void $ forkIO $ do + now <- getTime MonotonicRaw + when (time > now) $ + threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 + atomically . writeTVar nowVar =<< getTime MonotonicRaw + + let sendBytes sp = do + when (not $ null $ spAckedBy sp) $ do + now <- getTime MonotonicRaw + atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp + { spTime = now + , spRetryCount = spRetryCount sp + 1 + } + case peerAddress peer of + DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr + PeerIceSession ice -> iceSend ice (spData sp) + + let sendNextPacket = do + (secure, ackedBy, packet@(TransportPacket header content)) <- + readTQueue (peerOutQueue peer) + + let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) + let plain = BL.toStrict $ BL.concat $ + (serializeObject $ transportToObject header) + : map lazyLoadBytes content + + mbch <- readTVar (peerChannel peer) >>= \case ChannelEstablished ch -> return (Just ch) - _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:) + _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):) return Nothing - case mbch of - Just ch -> do - runExceptT (channelEncrypt ch plain) >>= \case - Right ctext -> return $ Just ctext - Left err -> do logd $ "Failed to encrypt data: " ++ err - return Nothing - Nothing | secure -> return Nothing - | otherwise -> return $ Just plain - - case mbs of - Just bs -> case peerAddress peer of - DatagramAddress sock addr -> void $ S.sendTo sock bs addr - PeerIceSession ice -> iceSend ice bs - Nothing -> return () + return $ do + mbs <- case mbch of + Just ch -> do + runExceptT (channelEncrypt ch plain) >>= \case + Right ctext -> return $ Just ctext + Left err -> do logd $ "Failed to encrypt data: " ++ err + return Nothing + Nothing | secure -> return Nothing + | otherwise -> return $ Just plain + + case mbs of + Just bs -> do + sendBytes $ SentPacket + { spTime = undefined + , spRetryCount = -1 + , spAckedBy = ackedBy + , spData = bs + } + Nothing -> return () + + let retransmitPacket = do + now <- readTVar nowVar + (sp, rest) <- readTVar (peerSentPackets peer) >>= \case + sps@(_:_) -> return (last sps, init sps) + _ -> retry + let nextTry = spTime sp + fromNanoSecs 1000000000 + if now < nextTry + then do + wait <- readTVar waitVar + if wait <= now || nextTry < wait + then do writeTVar waitVar nextTry + return $ waitTill nextTry + else retry + else do + writeTVar (peerSentPackets peer) rest + return $ sendBytes sp + + forever $ join $ atomically $ do + retransmitPacket <|> sendNextPacket + +processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM () +processAcknowledgements peer = mapM_ $ \hitem -> do + modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do @@ -432,7 +492,8 @@ dataResponseWorker server = forever $ do let reqs = concat $ map snd list when (not $ null reqs) $ do let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] - atomically $ sendToPeerPlain peer packet + ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] + atomically $ sendToPeerPlain peer ackedBy packet newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a } @@ -456,12 +517,16 @@ modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v data PacketHandlerState = PacketHandlerState { phPeer :: Peer , phHead :: [TransportHeaderItem] + , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] } addHeader :: TransportHeaderItem -> PacketHandler () addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph } +addAckedBy :: [TransportHeaderItem] -> PacketHandler () +addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs } + addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } @@ -475,6 +540,7 @@ handlePacket :: Head LocalState -> UnifiedIdentity -> Bool -> TransportHeader -> [PartialRef] -> IO () handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer + processAcknowledgements peer headers ochannel <- readTVar $ peerChannel peer let sidentity = idData identity plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat @@ -486,7 +552,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers _ -> [] ] - res <- runExceptT $ flip execStateT (PacketHandlerState peer [] []) $ unPacketHandler $ do + res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do let logd = liftSTM . writeTQueue (serverErrorLog server) forM_ headers $ \case Acknowledged ref -> do @@ -503,6 +569,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers | secure || refDigest ref `elem` plaintextRefs -> do Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref addHeader $ DataResponse ref + addAckedBy [ Acknowledged ref, Rejected ref ] addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show ref @@ -520,15 +587,17 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer readTVarP (peerIdentityVar peer) >>= \case PeerIdentityUnknown idwait -> do - addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity + let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity + addHeader $ AnnounceSelf ref writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer - _ -> return () + _ -> addHeader $ Acknowledged pref AnnounceUpdate ref -> do readTVarP (peerIdentityVar peer) >>= \case PeerIdentityFull _ -> do void $ newWaitingRef ref $ handleIdentityUpdate peer + addHeader $ Acknowledged ref _ -> return () TrChannelRequest reqref -> do @@ -578,7 +647,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers Right ph -> do when (not $ null $ phHead ph) $ do let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) - writeTQueue (serverOutQueue server) (peer, secure, packet) + writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet) withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () @@ -592,14 +661,16 @@ setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback setupChannel identity peer upid = do req <- createChannelRequest (peerStorage peer) identity upid let ist = peerInStorage peer + let reqref = partialRef ist $ storedRef req let hitems = - [ TrChannelRequest $ partialRef ist $ storedRef req + [ TrChannelRequest reqref , AnnounceSelf $ partialRef ist $ storedRef $ idData identity ] liftIO $ atomically $ do readTVar (peerChannel peer) >>= \case ChannelWait -> do - sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req] + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] writeTVar (peerChannel peer) $ ChannelOurRequest req _ -> return () @@ -611,8 +682,10 @@ handleChannelRequest peer identity req = do readTVar (peerChannel peer) >>= \case ChannelPeerRequest wr | wrDigest wr == refDigest req -> do writeTVar (peerChannel peer) $ ChannelOurAccept acc ch - let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc) - sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat + let accref = (partialRef (peerInStorage peer) $ storedRef acc) + header = TrChannelAccept accref + ackedBy = [ Acknowledged accref, Rejected accref ] + sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat [ [ storedRef $ acc ] , [ storedRef $ signedData $ fromStored acc ] , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] @@ -629,7 +702,7 @@ handleChannelAccept oh identity accref = do Right acc -> do ch <- acceptedChannel identity upid (wrappedLoad acc) liftIO $ atomically $ do - sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) [] + sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] writeTVar (peerChannel peer) $ ChannelEstablished ch finalizedChannel peer oh identity @@ -638,11 +711,14 @@ handleChannelAccept oh identity accref = do finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () finalizedChannel peer oh self = do - -- Identity update let ist = peerInStorage peer - sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $ - ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) : - ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self ) + + -- Identity update + do + let selfRef = partialRef ist $ storedRef $ idData $ self + updateRefs = map (partialRef ist . storedRef) $ idUpdates $ self + sendToPeerS peer [] $ flip TransportPacket [] $ TransportHeader $ + AnnounceSelf selfRef : map AnnounceUpdate updateRefs -- Shared state readTVar (peerIdentityVar peer) >>= \case @@ -650,14 +726,15 @@ finalizedChannel peer oh self = do writeTQueue (serverIOActions $ peerServer peer) $ do Just h <- liftIO $ reloadHead oh let shared = lsShared $ headObject h - let hitems = (ServiceType $ serviceID @SyncService Proxy) : - map (ServiceRef . partialRef ist . storedRef) shared - liftIO $ atomically $ sendToPeerS peer $ + let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs + srefs = map (partialRef ist . storedRef) shared + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ] + liftIO $ atomically $ sendToPeerS peer ackedBy $ TransportPacket (TransportHeader hitems) $ map storedRef shared _ -> return () -- Outstanding service packets - mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] + mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback @@ -700,16 +777,20 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do mkPeer :: Server -> PeerAddress -> IO Peer mkPeer server paddr = do pst <- deriveEphemeralStorage $ serverStorage server - Peer + peer <- Peer <$> pure paddr <*> pure server <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) <*> newTVarIO ChannelWait <*> pure pst <*> derivePartialStorage pst + <*> newTQueueIO + <*> newTVarIO [] <*> newTMVarIO M.empty <*> newTVarIO [] <*> newTMVarIO [] + void $ forkIO $ sendWorker peer + return peer serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do @@ -733,7 +814,7 @@ serverPeer' server paddr = do return (M.insert paddr peer pvalue, (peer, True)) when hello $ do identity <- serverIdentity server - atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $ + atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $ TransportPacket (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) [] @@ -756,13 +837,14 @@ sendToPeerList peer parts = do let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs) header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) packet = TransportPacket header content - liftIO $ atomically $ sendToPeerS peer packet + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet -sendToPeerS :: Peer -> TransportPacket -> STM () -sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet) +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () +sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet) -sendToPeerPlain :: Peer -> TransportPacket -> STM () -sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet) +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () +sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet) sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do -- cgit v1.2.3