summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Erebos/Network/Protocol.hs60
1 files changed, 48 insertions, 12 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index b0047fc..d759994 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -68,6 +68,9 @@ protocolVersion = T.pack "0.1"
protocolVersions :: [Text]
protocolVersions = [protocolVersion]
+keepAliveInternal :: TimeSpec
+keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int)
+
data TransportPacket a = TransportPacket TransportHeader [a]
@@ -186,6 +189,7 @@ data Connection addr = Connection
, cReservedPackets :: TVar Int
, cSentPackets :: TVar [SentPacket]
, cToAcknowledge :: TVar [Integer]
+ , cNextKeepAlive :: TVar (Maybe TimeSpec)
, cInStreams :: TVar [(Word8, Stream)]
, cOutStreams :: TVar [(Word8, Stream)]
}
@@ -487,6 +491,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
cReservedPackets <- newTVar 0
cSentPackets <- newTVar []
cToAcknowledge <- newTVar []
+ cNextKeepAlive <- newTVar Nothing
cInStreams <- newTVar []
cOutStreams <- newTVar []
let conn = Connection {..}
@@ -548,6 +553,7 @@ processIncoming gs@GlobalState {..} = do
Nothing -> throwError "empty packet"
+ now <- getTime Monotonic
runExceptT parse >>= \case
Right (Left (secure, objs, mbcounter))
| hobj:content <- objs
@@ -562,6 +568,7 @@ processIncoming gs@GlobalState {..} = do
case mbup of
Just up -> putTMVar gNextUp (conn, (secure, up))
Nothing -> return ()
+ updateKeepAlive conn now
processAcknowledgements gs conn items
ioAfter
Nothing -> return ()
@@ -571,8 +578,9 @@ processIncoming gs@GlobalState {..} = do
gLog $ show objs
Right (Right (snum, seq8, content, counter))
- | Just Connection {..} <- mbconn
+ | Just conn@Connection {..} <- mbconn
-> atomically $ do
+ updateKeepAlive conn now
(lookup snum <$> readTVar cInStreams) >>= \case
Nothing ->
gLog $ "unexpected stream number " ++ show snum
@@ -713,7 +721,7 @@ reservePacket conn@Connection {..} = do
return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
-resendBytes Connection {..} reserved sp = do
+resendBytes conn@Connection {..} reserved sp = do
let GlobalState {..} = cGlobalState
now <- getTime Monotonic
atomically $ do
@@ -726,6 +734,7 @@ resendBytes Connection {..} reserved sp = do
, spRetryCount = spRetryCount sp + 1
}
writeFlow gDataFlow (cAddress, spData sp)
+ updateKeepAlive conn now
sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()
sendBytes conn reserved bs = resendBytes conn reserved
@@ -738,6 +747,12 @@ sendBytes conn reserved bs = resendBytes conn reserved
, spData = bs
}
+updateKeepAlive :: Connection addr -> TimeSpec -> STM ()
+updateKeepAlive Connection {..} now = do
+ let next = now + keepAliveInternal
+ writeTVar cNextKeepAlive $ Just next
+
+
processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
processOutgoing gs@GlobalState {..} = do
@@ -777,11 +792,12 @@ processOutgoing gs@GlobalState {..} = do
let onAck = sequence_ $ map (streamAccepted conn) $
catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
- let mkPlain extraHeaders =
- let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
- in BL.concat $
- (serializeObject $ transportToObject gStorage header)
- : map lazyLoadBytes content
+ let mkPlain extraHeaders
+ | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems =
+ BL.concat $
+ (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems)
+ : map lazyLoadBytes content
+ | otherwise = BL.empty
let usePlaintext = do
plain <- mkPlain <$> generateCookieHeaders conn channel
@@ -811,6 +827,13 @@ processOutgoing gs@GlobalState {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
+ let waitUntil :: TimeSpec -> TimeSpec -> STM ()
+ waitUntil now till = do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || till < nextTimeout
+ then writeTVar gNextTimeout till
+ else retry
+
let retransmitPacket :: Connection addr -> STM (IO ())
retransmitPacket conn@Connection {..} = do
now <- readTVar gNowVar
@@ -819,11 +842,8 @@ processOutgoing gs@GlobalState {..} = do
_ -> retry
let nextTry = spTime sp + fromNanoSecs 1000000000
if | now < nextTry -> do
- nextTimeout <- readTVar gNextTimeout
- if nextTimeout <= now || nextTry < nextTimeout
- then do writeTVar gNextTimeout nextTry
- return $ return ()
- else retry
+ waitUntil now nextTry
+ return $ return ()
| spRetryCount sp < 2 -> do
reserved <- reservePacket conn
writeTVar cSentPackets rest
@@ -863,11 +883,27 @@ processOutgoing gs@GlobalState {..} = do
writeTVar gIdentity (nid, cur : past)
return $ return ()
+ let sendKeepAlive :: Connection addr -> STM (IO ())
+ sendKeepAlive Connection {..} = do
+ readTVar cNextKeepAlive >>= \case
+ Nothing -> retry
+ Just next -> do
+ now <- readTVar gNowVar
+ if next <= now
+ then do
+ identity <- fst <$> readTVar gIdentity
+ let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
+ writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], [])
+ else do
+ waitUntil now next
+ return $ return ()
+
conns <- readTVar gConnections
msum $ concat $
[ map retransmitPacket conns
, map sendNextPacket conns
, [ handleControlRequests ]
+ , map sendKeepAlive conns
]
processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ())