summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2022-01-22 21:16:40 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2022-01-23 22:20:16 +0100
commitc51983c7d268d9501f3ff3e0e50f5b0293c6d788 (patch)
treeabe373f22e1bc1b05a8c7e77d13225aad91a661a
parent13c7c7ba82c455c077010b1d2fa6d0e332de7601 (diff)
Network: retransmission of lost packets
-rw-r--r--erebos.cabal1
-rw-r--r--src/Network.hs210
2 files changed, 147 insertions, 64 deletions
diff --git a/erebos.cabal b/erebos.cabal
index bf091d1..d6fcdc5 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -63,6 +63,7 @@ executable erebos
binary >=0.8 && <0.9,
bytestring >=0.10 && <0.12,
cereal >= 0.5 && <0.6,
+ clock >=0.8 && < 0.9,
containers >= 0.6 && <0.7,
cryptonite >=0.25 && <0.30,
deepseq >= 1.4 && <1.5,
diff --git a/src/Network.hs b/src/Network.hs
index e78ae7c..01caef7 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -17,6 +17,7 @@ module Network (
discoveryPort,
) where
+import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
@@ -42,6 +43,8 @@ import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket
import qualified Network.Socket.ByteString as S
+import System.Clock
+
import Channel
import ICE
import Identity
@@ -66,7 +69,6 @@ data Server = Server
, serverIdentity_ :: MVar UnifiedIdentity
, serverSocket :: MVar Socket
, serverChanPacket :: Chan (PeerAddress, BC.ByteString)
- , serverOutQueue :: TQueue (Peer, Bool, TransportPacket)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, serverIOActions :: TQueue (ExceptT String IO ())
, serverServices :: [SomeService]
@@ -101,11 +103,20 @@ data Peer = Peer
, peerChannel :: TVar PeerChannel
, peerStorage :: Storage
, peerInStorage :: PartialStorage
+ , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket)
+ , peerSentPackets :: TVar [SentPacket]
, peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
- , peerServiceOutQueue :: TVar [TransportPacket]
+ , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)]
, peerWaitingRefs :: TMVar [WaitingRef]
}
+data SentPacket = SentPacket
+ { spTime :: TimeSpec
+ , spRetryCount :: Int
+ , spAckedBy :: [TransportHeaderItem]
+ , spData :: BC.ByteString
+ }
+
peerServer :: Peer -> Server
peerServer = peerServer_
@@ -226,7 +237,6 @@ startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeSer
startServer opt origHead logd' services = do
let storage = refStorage $ headRef origHead
chanPacket <- newChan
- outQueue <- newTQueueIO
dataResponse <- newTQueueIO
ioActions <- newTQueueIO
chanPeer <- newTChanIO
@@ -244,7 +254,6 @@ startServer opt origHead logd' services = do
, serverIdentity_ = midentity
, serverSocket = ssocket
, serverChanPacket = chanPacket
- , serverOutQueue = outQueue
, serverDataResponse = dataResponse
, serverIOActions = ioActions
, serverServices = services
@@ -258,7 +267,6 @@ startServer opt origHead logd' services = do
void $ forkIO $ forever $ do
logd' =<< atomically (readTQueue errlog)
- void $ forkIO $ sendWorker server
void $ forkIO $ dataResponseWorker server
void $ forkIO $ forever $ do
either (atomically . logd) return =<< runExceptT =<<
@@ -285,25 +293,27 @@ startServer opt origHead logd' services = do
let announceUpdate identity = do
st <- derivePartialStorage storage
- let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) :
- map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity)
- let packet = TransportPacket (TransportHeader hitems) []
+ let selfRef = partialRef st $ storedRef $ idData identity
+ updateRefs = map (partialRef st . storedRef) $ idUpdates identity
+ hitems = AnnounceSelf selfRef : map AnnounceUpdate updateRefs
+ packet = TransportPacket (TransportHeader $ hitems) []
ps <- readMVar peers
forM_ ps $ \peer -> atomically $ do
((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case
(PeerIdentityFull _, ChannelEstablished _) ->
- writeTQueue outQueue (peer, True, packet)
+ writeTQueue (peerOutQueue peer) (True, [], packet)
_ -> return ()
let shareState self shared peer = do
- let hitems = (ServiceType $ serviceID @SyncService Proxy) :
- map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared
+ let refs = map (partialRef (peerInStorage peer) . storedRef) shared
+ hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ]
packet = TransportPacket (TransportHeader hitems) $
map storedRef shared
atomically $ readTVar (peerIdentityVar peer) >>= \case
PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
- sendToPeerS peer packet
+ sendToPeerS peer ackedBy packet
_ -> return ()
void $ watchHead origHead $ \h -> do
@@ -373,36 +383,86 @@ startServer opt origHead logd' services = do
return server
-sendWorker :: Server -> IO ()
-sendWorker server = forever $ do
- (peer, secure, packet@(TransportPacket header content)) <-
- atomically (readTQueue $ serverOutQueue server)
-
- let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer)
- let plain = BL.toStrict $ BL.concat $
- (serializeObject $ transportToObject header)
- : map lazyLoadBytes content
- mbs <- do
- mbch <- atomically $ do
- readTVar (peerChannel peer) >>= \case
+sendWorker :: Peer -> IO ()
+sendWorker peer = do
+ startTime <- getTime MonotonicRaw
+ nowVar <- newTVarIO startTime
+ waitVar <- newTVarIO startTime
+
+ let waitTill time = void $ forkIO $ do
+ now <- getTime MonotonicRaw
+ when (time > now) $
+ threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
+ atomically . writeTVar nowVar =<< getTime MonotonicRaw
+
+ let sendBytes sp = do
+ when (not $ null $ spAckedBy sp) $ do
+ now <- getTime MonotonicRaw
+ atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp
+ { spTime = now
+ , spRetryCount = spRetryCount sp + 1
+ }
+ case peerAddress peer of
+ DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr
+ PeerIceSession ice -> iceSend ice (spData sp)
+
+ let sendNextPacket = do
+ (secure, ackedBy, packet@(TransportPacket header content)) <-
+ readTQueue (peerOutQueue peer)
+
+ let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer)
+ let plain = BL.toStrict $ BL.concat $
+ (serializeObject $ transportToObject header)
+ : map lazyLoadBytes content
+
+ mbch <- readTVar (peerChannel peer) >>= \case
ChannelEstablished ch -> return (Just ch)
- _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:)
+ _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):)
return Nothing
- case mbch of
- Just ch -> do
- runExceptT (channelEncrypt ch plain) >>= \case
- Right ctext -> return $ Just ctext
- Left err -> do logd $ "Failed to encrypt data: " ++ err
- return Nothing
- Nothing | secure -> return Nothing
- | otherwise -> return $ Just plain
-
- case mbs of
- Just bs -> case peerAddress peer of
- DatagramAddress sock addr -> void $ S.sendTo sock bs addr
- PeerIceSession ice -> iceSend ice bs
- Nothing -> return ()
+ return $ do
+ mbs <- case mbch of
+ Just ch -> do
+ runExceptT (channelEncrypt ch plain) >>= \case
+ Right ctext -> return $ Just ctext
+ Left err -> do logd $ "Failed to encrypt data: " ++ err
+ return Nothing
+ Nothing | secure -> return Nothing
+ | otherwise -> return $ Just plain
+
+ case mbs of
+ Just bs -> do
+ sendBytes $ SentPacket
+ { spTime = undefined
+ , spRetryCount = -1
+ , spAckedBy = ackedBy
+ , spData = bs
+ }
+ Nothing -> return ()
+
+ let retransmitPacket = do
+ now <- readTVar nowVar
+ (sp, rest) <- readTVar (peerSentPackets peer) >>= \case
+ sps@(_:_) -> return (last sps, init sps)
+ _ -> retry
+ let nextTry = spTime sp + fromNanoSecs 1000000000
+ if now < nextTry
+ then do
+ wait <- readTVar waitVar
+ if wait <= now || nextTry < wait
+ then do writeTVar waitVar nextTry
+ return $ waitTill nextTry
+ else retry
+ else do
+ writeTVar (peerSentPackets peer) rest
+ return $ sendBytes sp
+
+ forever $ join $ atomically $ do
+ retransmitPacket <|> sendNextPacket
+
+processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM ()
+processAcknowledgements peer = mapM_ $ \hitem -> do
+ modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -432,7 +492,8 @@ dataResponseWorker server = forever $ do
let reqs = concat $ map snd list
when (not $ null reqs) $ do
let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []
- atomically $ sendToPeerPlain peer packet
+ ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ]
+ atomically $ sendToPeerPlain peer ackedBy packet
newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a }
@@ -456,12 +517,16 @@ modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v
data PacketHandlerState = PacketHandlerState
{ phPeer :: Peer
, phHead :: [TransportHeaderItem]
+ , phAckedBy :: [TransportHeaderItem]
, phBody :: [Ref]
}
addHeader :: TransportHeaderItem -> PacketHandler ()
addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph }
+addAckedBy :: [TransportHeaderItem] -> PacketHandler ()
+addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs }
+
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
@@ -475,6 +540,7 @@ handlePacket :: Head LocalState -> UnifiedIdentity -> Bool
-> TransportHeader -> [PartialRef] -> IO ()
handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
+ processAcknowledgements peer headers
ochannel <- readTVar $ peerChannel peer
let sidentity = idData identity
plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
@@ -486,7 +552,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
_ -> []
]
- res <- runExceptT $ flip execStateT (PacketHandlerState peer [] []) $ unPacketHandler $ do
+ res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do
let logd = liftSTM . writeTQueue (serverErrorLog server)
forM_ headers $ \case
Acknowledged ref -> do
@@ -503,6 +569,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
| secure || refDigest ref `elem` plaintextRefs -> do
Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref
addHeader $ DataResponse ref
+ addAckedBy [ Acknowledged ref, Rejected ref ]
addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show ref
@@ -520,15 +587,17 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityUnknown idwait -> do
- addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity
+ let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity
+ addHeader $ AnnounceSelf ref
writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer
- _ -> return ()
+ _ -> addHeader $ Acknowledged pref
AnnounceUpdate ref -> do
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityFull _ -> do
void $ newWaitingRef ref $ handleIdentityUpdate peer
+ addHeader $ Acknowledged ref
_ -> return ()
TrChannelRequest reqref -> do
@@ -578,7 +647,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
Right ph -> do
when (not $ null $ phHead ph) $ do
let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
- writeTQueue (serverOutQueue server) (peer, secure, packet)
+ writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet)
withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
@@ -592,14 +661,16 @@ setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
setupChannel identity peer upid = do
req <- createChannelRequest (peerStorage peer) identity upid
let ist = peerInStorage peer
+ let reqref = partialRef ist $ storedRef req
let hitems =
- [ TrChannelRequest $ partialRef ist $ storedRef req
+ [ TrChannelRequest reqref
, AnnounceSelf $ partialRef ist $ storedRef $ idData identity
]
liftIO $ atomically $ do
readTVar (peerChannel peer) >>= \case
ChannelWait -> do
- sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req]
+ sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
+ TransportPacket (TransportHeader hitems) [storedRef req]
writeTVar (peerChannel peer) $ ChannelOurRequest req
_ -> return ()
@@ -611,8 +682,10 @@ handleChannelRequest peer identity req = do
readTVar (peerChannel peer) >>= \case
ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
writeTVar (peerChannel peer) $ ChannelOurAccept acc ch
- let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc)
- sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat
+ let accref = (partialRef (peerInStorage peer) $ storedRef acc)
+ header = TrChannelAccept accref
+ ackedBy = [ Acknowledged accref, Rejected accref ]
+ sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat
[ [ storedRef $ acc ]
, [ storedRef $ signedData $ fromStored acc ]
, [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ]
@@ -629,7 +702,7 @@ handleChannelAccept oh identity accref = do
Right acc -> do
ch <- acceptedChannel identity upid (wrappedLoad acc)
liftIO $ atomically $ do
- sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) []
+ sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []
writeTVar (peerChannel peer) $ ChannelEstablished ch
finalizedChannel peer oh identity
@@ -638,11 +711,14 @@ handleChannelAccept oh identity accref = do
finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM ()
finalizedChannel peer oh self = do
- -- Identity update
let ist = peerInStorage peer
- sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $
- ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) :
- ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self )
+
+ -- Identity update
+ do
+ let selfRef = partialRef ist $ storedRef $ idData $ self
+ updateRefs = map (partialRef ist . storedRef) $ idUpdates $ self
+ sendToPeerS peer [] $ flip TransportPacket [] $ TransportHeader $
+ AnnounceSelf selfRef : map AnnounceUpdate updateRefs
-- Shared state
readTVar (peerIdentityVar peer) >>= \case
@@ -650,14 +726,15 @@ finalizedChannel peer oh self = do
writeTQueue (serverIOActions $ peerServer peer) $ do
Just h <- liftIO $ reloadHead oh
let shared = lsShared $ headObject h
- let hitems = (ServiceType $ serviceID @SyncService Proxy) :
- map (ServiceRef . partialRef ist . storedRef) shared
- liftIO $ atomically $ sendToPeerS peer $
+ let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs
+ srefs = map (partialRef ist . storedRef) shared
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy $
TransportPacket (TransportHeader hitems) $ map storedRef shared
_ -> return ()
-- Outstanding service packets
- mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []
+ mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
@@ -700,16 +777,20 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer server paddr = do
pst <- deriveEphemeralStorage $ serverStorage server
- Peer
+ peer <- Peer
<$> pure paddr
<*> pure server
<*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO [])
<*> newTVarIO ChannelWait
<*> pure pst
<*> derivePartialStorage pst
+ <*> newTQueueIO
+ <*> newTVarIO []
<*> newTMVarIO M.empty
<*> newTVarIO []
<*> newTMVarIO []
+ void $ forkIO $ sendWorker peer
+ return peer
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer server paddr = do
@@ -733,7 +814,7 @@ serverPeer' server paddr = do
return (M.insert paddr peer pvalue, (peer, True))
when hello $ do
identity <- serverIdentity server
- atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $
+ atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $
TransportPacket
(TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ])
[]
@@ -756,13 +837,14 @@ sendToPeerList peer parts = do
let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs)
header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)
packet = TransportPacket header content
- liftIO $ atomically $ sendToPeerS peer packet
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
-sendToPeerS :: Peer -> TransportPacket -> STM ()
-sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet)
+sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
+sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
-sendToPeerPlain :: Peer -> TransportPacket -> STM ()
-sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet)
+sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
+sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet)
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith peer fobj = do