diff options
Diffstat (limited to 'src/Erebos/Network')
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 60 | 
1 files changed, 48 insertions, 12 deletions
| diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index b0047fc..d759994 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -68,6 +68,9 @@ protocolVersion = T.pack "0.1"  protocolVersions :: [Text]  protocolVersions = [protocolVersion] +keepAliveInternal :: TimeSpec +keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int) +  data TransportPacket a = TransportPacket TransportHeader [a] @@ -186,6 +189,7 @@ data Connection addr = Connection      , cReservedPackets :: TVar Int      , cSentPackets :: TVar [SentPacket]      , cToAcknowledge :: TVar [Integer] +    , cNextKeepAlive :: TVar (Maybe TimeSpec)      , cInStreams :: TVar [(Word8, Stream)]      , cOutStreams :: TVar [(Word8, Stream)]      } @@ -487,6 +491,7 @@ newConnection cGlobalState@GlobalState {..} addr = do      cReservedPackets <- newTVar 0      cSentPackets <- newTVar []      cToAcknowledge <- newTVar [] +    cNextKeepAlive <- newTVar Nothing      cInStreams <- newTVar []      cOutStreams <- newTVar []      let conn = Connection {..} @@ -548,6 +553,7 @@ processIncoming gs@GlobalState {..} = do                  Nothing -> throwError "empty packet" +        now <- getTime Monotonic          runExceptT parse >>= \case              Right (Left (secure, objs, mbcounter))                  | hobj:content <- objs @@ -562,6 +568,7 @@ processIncoming gs@GlobalState {..} = do                              case mbup of                                  Just up -> putTMVar gNextUp (conn, (secure, up))                                  Nothing -> return () +                            updateKeepAlive conn now                              processAcknowledgements gs conn items                          ioAfter                      Nothing -> return () @@ -571,8 +578,9 @@ processIncoming gs@GlobalState {..} = do                        gLog $ show objs              Right (Right (snum, seq8, content, counter)) -                | Just Connection {..} <- mbconn +                | Just conn@Connection {..} <- mbconn                  -> atomically $ do +                    updateKeepAlive conn now                      (lookup snum <$> readTVar cInStreams) >>= \case                          Nothing ->                              gLog $ "unexpected stream number " ++ show snum @@ -713,7 +721,7 @@ reservePacket conn@Connection {..} = do      return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)  resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () -resendBytes Connection {..} reserved sp = do +resendBytes conn@Connection {..} reserved sp = do      let GlobalState {..} = cGlobalState      now <- getTime Monotonic      atomically $ do @@ -726,6 +734,7 @@ resendBytes Connection {..} reserved sp = do                  , spRetryCount = spRetryCount sp + 1                  }          writeFlow gDataFlow (cAddress, spData sp) +        updateKeepAlive conn now  sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()  sendBytes conn reserved bs = resendBytes conn reserved @@ -738,6 +747,12 @@ sendBytes conn reserved bs = resendBytes conn reserved          , spData = bs          } +updateKeepAlive :: Connection addr -> TimeSpec -> STM () +updateKeepAlive Connection {..} now = do +    let next = now + keepAliveInternal +    writeTVar cNextKeepAlive $ Just next + +  processOutgoing :: forall addr. GlobalState addr -> STM (IO ())  processOutgoing gs@GlobalState {..} = do @@ -777,11 +792,12 @@ processOutgoing gs@GlobalState {..} = do                  let onAck = sequence_ $ map (streamAccepted conn) $                          catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) -                let mkPlain extraHeaders = -                        let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems -                         in BL.concat $ -                            (serializeObject $ transportToObject gStorage header) -                            : map lazyLoadBytes content +                let mkPlain extraHeaders +                        | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems = +                             BL.concat $ +                                (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems) +                                : map lazyLoadBytes content +                        | otherwise = BL.empty                  let usePlaintext = do                          plain <- mkPlain <$> generateCookieHeaders conn channel @@ -811,6 +827,13 @@ processOutgoing gs@GlobalState {..} = do                          sendBytes conn mbReserved' bs                      Nothing -> return () +    let waitUntil :: TimeSpec -> TimeSpec -> STM () +        waitUntil now till = do +            nextTimeout <- readTVar gNextTimeout +            if nextTimeout <= now || till < nextTimeout +               then writeTVar gNextTimeout till +               else retry +      let retransmitPacket :: Connection addr -> STM (IO ())          retransmitPacket conn@Connection {..} = do              now <- readTVar gNowVar @@ -819,11 +842,8 @@ processOutgoing gs@GlobalState {..} = do                  _         -> retry              let nextTry = spTime sp + fromNanoSecs 1000000000              if | now < nextTry -> do -                    nextTimeout <- readTVar gNextTimeout -                    if nextTimeout <= now || nextTry < nextTimeout -                       then do writeTVar gNextTimeout nextTry -                               return $ return () -                       else retry +                    waitUntil now nextTry +                    return $ return ()                 | spRetryCount sp < 2 -> do                      reserved <- reservePacket conn                      writeTVar cSentPackets rest @@ -863,11 +883,27 @@ processOutgoing gs@GlobalState {..} = do                  writeTVar gIdentity (nid, cur : past)                  return $ return () +    let sendKeepAlive :: Connection addr -> STM (IO ()) +        sendKeepAlive Connection {..} = do +            readTVar cNextKeepAlive >>= \case +                Nothing -> retry +                Just next -> do +                    now <- readTVar gNowVar +                    if next <= now +                      then do +                        identity <- fst <$> readTVar gIdentity +                        let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] +                        writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], []) +                      else do +                        waitUntil now next +                    return $ return () +      conns <- readTVar gConnections      msum $ concat $          [ map retransmitPacket conns          , map sendNextPacket conns          , [ handleControlRequests ] +        , map sendKeepAlive conns          ]  processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) |