diff options
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 60 |
1 files changed, 48 insertions, 12 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index b0047fc..d759994 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -68,6 +68,9 @@ protocolVersion = T.pack "0.1" protocolVersions :: [Text] protocolVersions = [protocolVersion] +keepAliveInternal :: TimeSpec +keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int) + data TransportPacket a = TransportPacket TransportHeader [a] @@ -186,6 +189,7 @@ data Connection addr = Connection , cReservedPackets :: TVar Int , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] + , cNextKeepAlive :: TVar (Maybe TimeSpec) , cInStreams :: TVar [(Word8, Stream)] , cOutStreams :: TVar [(Word8, Stream)] } @@ -487,6 +491,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cReservedPackets <- newTVar 0 cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] + cNextKeepAlive <- newTVar Nothing cInStreams <- newTVar [] cOutStreams <- newTVar [] let conn = Connection {..} @@ -548,6 +553,7 @@ processIncoming gs@GlobalState {..} = do Nothing -> throwError "empty packet" + now <- getTime Monotonic runExceptT parse >>= \case Right (Left (secure, objs, mbcounter)) | hobj:content <- objs @@ -562,6 +568,7 @@ processIncoming gs@GlobalState {..} = do case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () + updateKeepAlive conn now processAcknowledgements gs conn items ioAfter Nothing -> return () @@ -571,8 +578,9 @@ processIncoming gs@GlobalState {..} = do gLog $ show objs Right (Right (snum, seq8, content, counter)) - | Just Connection {..} <- mbconn + | Just conn@Connection {..} <- mbconn -> atomically $ do + updateKeepAlive conn now (lookup snum <$> readTVar cInStreams) >>= \case Nothing -> gLog $ "unexpected stream number " ++ show snum @@ -713,7 +721,7 @@ reservePacket conn@Connection {..} = do return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () -resendBytes Connection {..} reserved sp = do +resendBytes conn@Connection {..} reserved sp = do let GlobalState {..} = cGlobalState now <- getTime Monotonic atomically $ do @@ -726,6 +734,7 @@ resendBytes Connection {..} reserved sp = do , spRetryCount = spRetryCount sp + 1 } writeFlow gDataFlow (cAddress, spData sp) + updateKeepAlive conn now sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () sendBytes conn reserved bs = resendBytes conn reserved @@ -738,6 +747,12 @@ sendBytes conn reserved bs = resendBytes conn reserved , spData = bs } +updateKeepAlive :: Connection addr -> TimeSpec -> STM () +updateKeepAlive Connection {..} now = do + let next = now + keepAliveInternal + writeTVar cNextKeepAlive $ Just next + + processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) processOutgoing gs@GlobalState {..} = do @@ -777,11 +792,12 @@ processOutgoing gs@GlobalState {..} = do let onAck = sequence_ $ map (streamAccepted conn) $ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let mkPlain extraHeaders = - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems - in BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders + | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems = + BL.concat $ + (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems) + : map lazyLoadBytes content + | otherwise = BL.empty let usePlaintext = do plain <- mkPlain <$> generateCookieHeaders conn channel @@ -811,6 +827,13 @@ processOutgoing gs@GlobalState {..} = do sendBytes conn mbReserved' bs Nothing -> return () + let waitUntil :: TimeSpec -> TimeSpec -> STM () + waitUntil now till = do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || till < nextTimeout + then writeTVar gNextTimeout till + else retry + let retransmitPacket :: Connection addr -> STM (IO ()) retransmitPacket conn@Connection {..} = do now <- readTVar gNowVar @@ -819,11 +842,8 @@ processOutgoing gs@GlobalState {..} = do _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 if | now < nextTry -> do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry + waitUntil now nextTry + return $ return () | spRetryCount sp < 2 -> do reserved <- reservePacket conn writeTVar cSentPackets rest @@ -863,11 +883,27 @@ processOutgoing gs@GlobalState {..} = do writeTVar gIdentity (nid, cur : past) return $ return () + let sendKeepAlive :: Connection addr -> STM (IO ()) + sendKeepAlive Connection {..} = do + readTVar cNextKeepAlive >>= \case + Nothing -> retry + Just next -> do + now <- readTVar gNowVar + if next <= now + then do + identity <- fst <$> readTVar gIdentity + let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] + writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], []) + else do + waitUntil now next + return $ return () + conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns , [ handleControlRequests ] + , map sendKeepAlive conns ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) |