summaryrefslogtreecommitdiff
path: root/src/Erebos/Network/Protocol.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r--src/Erebos/Network/Protocol.hs877
1 files changed, 877 insertions, 0 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
new file mode 100644
index 0000000..a009ad1
--- /dev/null
+++ b/src/Erebos/Network/Protocol.hs
@@ -0,0 +1,877 @@
+module Erebos.Network.Protocol (
+ TransportPacket(..),
+ transportToObject,
+ TransportHeader(..),
+ TransportHeaderItem(..),
+ SecurityRequirement(..),
+
+ WaitingRef(..),
+ WaitingRefCallback,
+ wrDigest,
+
+ ChannelState(..),
+
+ ControlRequest(..),
+ ControlMessage(..),
+ erebosNetworkProtocol,
+
+ Connection,
+ connAddress,
+ connData,
+ connGetChannel,
+ connSetChannel,
+ connClose,
+
+ RawStreamReader, RawStreamWriter,
+ connAddWriteStream,
+ connAddReadStream,
+ readStreamToList,
+ readObjectsFromStream,
+ writeByteStringToStream,
+
+ module Erebos.Flow,
+) where
+
+import Control.Applicative
+import Control.Concurrent
+import Control.Concurrent.Async
+import Control.Concurrent.STM
+import Control.Monad
+import Control.Monad.Except
+import Control.Monad.Trans
+
+import Data.Bits
+import Data.ByteString (ByteString)
+import Data.ByteString qualified as B
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
+import Data.Function
+import Data.List
+import Data.Maybe
+import Data.Text (Text)
+import Data.Text qualified as T
+import Data.Void
+import Data.Word
+
+import System.Clock
+
+import Erebos.Channel
+import Erebos.Flow
+import Erebos.Identity
+import Erebos.Service
+import Erebos.Storage
+
+
+protocolVersion :: Text
+protocolVersion = T.pack "0.1"
+
+protocolVersions :: [Text]
+protocolVersions = [protocolVersion]
+
+
+data TransportPacket a = TransportPacket TransportHeader [a]
+
+data TransportHeader = TransportHeader [TransportHeaderItem]
+ deriving (Show)
+
+data TransportHeaderItem
+ = Acknowledged RefDigest
+ | AcknowledgedSingle Integer
+ | Rejected RefDigest
+ | ProtocolVersion Text
+ | Initiation RefDigest
+ | CookieSet Cookie
+ | CookieEcho Cookie
+ | DataRequest RefDigest
+ | DataResponse RefDigest
+ | AnnounceSelf RefDigest
+ | AnnounceUpdate RefDigest
+ | TrChannelRequest RefDigest
+ | TrChannelAccept RefDigest
+ | ServiceType ServiceID
+ | ServiceRef RefDigest
+ | StreamOpen Word8
+ deriving (Eq, Show)
+
+newtype Cookie = Cookie ByteString
+ deriving (Eq, Show)
+
+data SecurityRequirement = PlaintextOnly
+ | PlaintextAllowed
+ | EncryptedOnly
+ deriving (Eq, Ord)
+
+isHeaderItemAcknowledged :: TransportHeaderItem -> Bool
+isHeaderItemAcknowledged = \case
+ Acknowledged {} -> False
+ AcknowledgedSingle {} -> False
+ Rejected {} -> False
+ ProtocolVersion {} -> False
+ Initiation {} -> False
+ CookieSet {} -> False
+ CookieEcho {} -> False
+ _ -> True
+
+transportToObject :: PartialStorage -> TransportHeader -> PartialObject
+transportToObject st (TransportHeader items) = Rec $ map single items
+ where single = \case
+ Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst)
+ AcknowledgedSingle num -> (BC.pack "ACK", RecInt num)
+ Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)
+ ProtocolVersion ver -> (BC.pack "VER", RecText ver)
+ Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst)
+ CookieSet (Cookie bytes) -> (BC.pack "CKS", RecBinary bytes)
+ CookieEcho (Cookie bytes) -> (BC.pack "CKE", RecBinary bytes)
+ DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst)
+ DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst)
+ AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst)
+ AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst)
+ TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst)
+ TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst)
+ ServiceType stype -> (BC.pack "SVT", RecUUID $ toUUID stype)
+ ServiceRef dgst -> (BC.pack "SVR", RecRef $ partialRefFromDigest st dgst)
+ StreamOpen num -> (BC.pack "STO", RecInt $ fromIntegral num)
+
+transportFromObject :: PartialObject -> Maybe TransportHeader
+transportFromObject (Rec items) = case catMaybes $ map single items of
+ [] -> Nothing
+ titems -> Just $ TransportHeader titems
+ where single (name, content) = if
+ | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref
+ | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num
+ | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref
+ | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver
+ | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref
+ | name == BC.pack "CKS", RecBinary bytes <- content -> Just $ CookieSet (Cookie bytes)
+ | name == BC.pack "CKE", RecBinary bytes <- content -> Just $ CookieEcho (Cookie bytes)
+ | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref
+ | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref
+ | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref
+ | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref
+ | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref
+ | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref
+ | name == BC.pack "SVT", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid
+ | name == BC.pack "SVR", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref
+ | name == BC.pack "STO", RecInt num <- content -> Just $ StreamOpen $ fromIntegral num
+ | otherwise -> Nothing
+transportFromObject _ = Nothing
+
+
+data GlobalState addr = (Eq addr, Show addr) => GlobalState
+ { gIdentity :: TVar (UnifiedIdentity, [UnifiedIdentity])
+ , gConnections :: TVar [Connection addr]
+ , gDataFlow :: SymFlow (addr, ByteString)
+ , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)
+ , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
+ , gLog :: String -> STM ()
+ , gStorage :: PartialStorage
+ , gNowVar :: TVar TimeSpec
+ , gNextTimeout :: TVar TimeSpec
+ , gInitConfig :: Ref
+ }
+
+data Connection addr = Connection
+ { cGlobalState :: GlobalState addr
+ , cAddress :: addr
+ , cDataUp :: Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ , cDataInternal :: Flow
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ (Maybe (Bool, TransportPacket PartialObject))
+ , cChannel :: TVar ChannelState
+ , cCookie :: TVar (Maybe Cookie)
+ , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+ , cMaxInFlightPackets :: TVar Int
+ , cReservedPackets :: TVar Int
+ , cSentPackets :: TVar [SentPacket]
+ , cToAcknowledge :: TVar [Integer]
+ , cInStreams :: TVar [(Word8, Stream)]
+ , cOutStreams :: TVar [(Word8, Stream)]
+ }
+
+instance Eq (Connection addr) where
+ (==) = (==) `on` cChannel
+
+connAddress :: Connection addr -> addr
+connAddress = cAddress
+
+connData :: Connection addr -> Flow
+ (Maybe (Bool, TransportPacket PartialObject))
+ (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
+connData = cDataUp
+
+connGetChannel :: Connection addr -> STM ChannelState
+connGetChannel Connection {..} = readTVar cChannel
+
+connSetChannel :: Connection addr -> ChannelState -> STM ()
+connSetChannel Connection {..} ch = do
+ writeTVar cChannel ch
+
+connClose :: Connection addr -> STM ()
+connClose conn@Connection {..} = do
+ let GlobalState {..} = cGlobalState
+ readTVar cChannel >>= \case
+ ChannelClosed -> return ()
+ _ -> do
+ writeTVar cChannel ChannelClosed
+ writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+ writeFlow cDataInternal Nothing
+
+connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
+connAddWriteStream conn@Connection {..} = do
+ outStreams <- readTVar cOutStreams
+ let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)])
+ doInsert n (s@(n', _) : rest) | n == n' =
+ fmap (s:) <$> doInsert (n + 1) rest
+ doInsert n streams | n < 63 = lift $ do
+ sState <- newTVar StreamOpening
+ (sFlowIn, sFlowOut) <- newFlow
+ sNextSequence <- newTVar 0
+ sWaitingForAck <- newTVar 0
+ let info = (n, Stream {..})
+ return (info, info : streams)
+ doInsert _ _ = throwError "all outbound streams in use"
+
+ runExceptT $ do
+ ((streamNumber, stream), outStreams') <- doInsert 1 outStreams
+ lift $ writeTVar cOutStreams outStreams'
+ return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+
+ where
+ go gs@GlobalState {..} streamNumber stream = do
+ (reserved, msg) <- atomically $ do
+ readTVar (sState stream) >>= \case
+ StreamRunning -> return ()
+ _ -> retry
+ (,) <$> reservePacket conn
+ <*> readFlow (sFlowOut stream)
+
+ (plain, cont, onAck) <- case msg of
+ StreamData {..} -> do
+ return (stpData, True, return ())
+ StreamClosed {} -> do
+ atomically $ do
+ -- wait for ack on all sent stream data
+ waits <- readTVar (sWaitingForAck stream)
+ when (waits > 0) retry
+ return (BC.empty, False, streamClosed conn streamNumber)
+
+ let secure = True
+ plainAckedBy = []
+ mbReserved = Just reserved
+
+ mbch <- atomically (readTVar cChannel) >>= return . \case
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
+
+ mbs <- case mbch of
+ Just ch -> do
+ runExceptT (channelEncrypt ch $ B.concat
+ [ B.singleton streamNumber
+ , B.singleton (fromIntegral (stpSequence msg) :: Word8)
+ , plain
+ ] ) >>= \case
+ Right (ctext, counter) -> do
+ let isAcked = True
+ return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ return Nothing
+ Nothing | secure -> return Nothing
+ | otherwise -> return $ Just (plain, plainAckedBy)
+
+ case mbs of
+ Just (bs, ackedBy) -> do
+ atomically $ do
+ modifyTVar' (sWaitingForAck stream) (+ 1)
+ let mbReserved' = (\rs -> rs
+ { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ , rsOnAck = do
+ rsOnAck rs
+ onAck
+ atomically $ modifyTVar' (sWaitingForAck stream) (subtract 1)
+ }) <$> mbReserved
+ sendBytes conn mbReserved' bs
+ Nothing -> return ()
+
+ when cont $ go gs streamNumber stream
+
+connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader
+connAddReadStream Connection {..} streamNumber = do
+ inStreams <- readTVar cInStreams
+ let doInsert (s@(n, _) : rest)
+ | streamNumber < n = fmap (s:) <$> doInsert rest
+ | streamNumber == n = doInsert rest
+ doInsert streams = do
+ sState <- newTVar StreamRunning
+ (sFlowIn, sFlowOut) <- newFlow
+ sNextSequence <- newTVar 0
+ sWaitingForAck <- newTVar 0
+ let stream = Stream {..}
+ return (stream, (streamNumber, stream) : streams)
+ (stream, inStreams') <- doInsert inStreams
+ writeTVar cInStreams inStreams'
+ return $ sFlowOut stream
+
+
+type RawStreamReader = Flow StreamPacket Void
+type RawStreamWriter = Flow Void StreamPacket
+
+data Stream = Stream
+ { sState :: TVar StreamState
+ , sFlowIn :: Flow Void StreamPacket
+ , sFlowOut :: Flow StreamPacket Void
+ , sNextSequence :: TVar Word64
+ , sWaitingForAck :: TVar Word64
+ }
+
+data StreamState = StreamOpening | StreamRunning
+
+data StreamPacket
+ = StreamData
+ { stpSequence :: Word64
+ , stpData :: BC.ByteString
+ }
+ | StreamClosed
+ { stpSequence :: Word64
+ }
+
+streamAccepted :: Connection addr -> Word8 -> IO ()
+streamAccepted Connection {..} snum = atomically $ do
+ (lookup snum <$> readTVar cOutStreams) >>= \case
+ Just Stream {..} -> do
+ modifyTVar' sState $ \case
+ StreamOpening -> StreamRunning
+ x -> x
+ Nothing -> return ()
+
+streamClosed :: Connection addr -> Word8 -> IO ()
+streamClosed Connection {..} snum = atomically $ do
+ modifyTVar' cOutStreams $ filter ((snum /=) . fst)
+
+readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
+readStreamToList stream = readFlowIO stream >>= \case
+ StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
+ StreamClosed sqEnd -> return (sqEnd, [])
+
+readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject])
+readObjectsFromStream st stream = do
+ (seqEnd, list) <- readStreamToList stream
+ let validate s ((s', bytes) : rest)
+ | s == s' = (bytes : ) <$> validate (s + 1) rest
+ | s > s' = validate s rest
+ | otherwise = throwError "missing object chunk"
+ validate s []
+ | s == seqEnd = return []
+ | otherwise = throwError "content length mismatch"
+ return $ do
+ content <- BL.fromChunks <$> validate 0 list
+ deserializeObjects st content
+
+writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
+writeByteStringToStream stream = go 0
+ where
+ go seqNum bstr
+ | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | otherwise = do
+ let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
+ writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ go (seqNum + 1) rest
+
+
+data WaitingRef = WaitingRef
+ { wrefStorage :: Storage
+ , wrefPartial :: PartialRef
+ , wrefAction :: Ref -> WaitingRefCallback
+ , wrefStatus :: TVar (Either [RefDigest] Ref)
+ }
+
+type WaitingRefCallback = ExceptT String IO ()
+
+wrDigest :: WaitingRef -> RefDigest
+wrDigest = refDigest . wrefPartial
+
+
+data ChannelState = ChannelNone
+ | ChannelCookieWait -- sent initiation, waiting for response
+ | ChannelCookieReceived -- received cookie, but no cookie echo yet
+ | ChannelCookieConfirmed -- received cookie echo, no need to send from our side
+ | ChannelOurRequest (Stored ChannelRequest)
+ | ChannelPeerRequest WaitingRef
+ | ChannelOurAccept (Stored ChannelAccept) Channel
+ | ChannelEstablished Channel
+ | ChannelClosed
+
+data ReservedToSend = ReservedToSend
+ { rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
+ , rsOnAck :: IO ()
+ , rsOnFail :: IO ()
+ }
+
+data SentPacket = SentPacket
+ { spTime :: TimeSpec
+ , spRetryCount :: Int
+ , spAckedBy :: Maybe (TransportHeaderItem -> Bool)
+ , spOnAck :: IO ()
+ , spOnFail :: IO ()
+ , spData :: BC.ByteString
+ }
+
+
+data ControlRequest addr = RequestConnection addr
+ | SendAnnounce addr
+ | UpdateSelfIdentity UnifiedIdentity
+
+data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest)
+ | ReceivedAnnounce addr RefDigest
+
+
+erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
+ => UnifiedIdentity
+ -> (String -> STM ())
+ -> SymFlow (addr, ByteString)
+ -> Flow (ControlRequest addr) (ControlMessage addr)
+ -> IO ()
+erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
+ gIdentity <- newTVarIO (initialIdentity, [])
+ gConnections <- newTVarIO []
+ gNextUp <- newEmptyTMVarIO
+ mStorage <- memoryStorage
+ gStorage <- derivePartialStorage mStorage
+
+ startTime <- getTime MonotonicRaw
+ gNowVar <- newTVarIO startTime
+ gNextTimeout <- newTVarIO startTime
+ gInitConfig <- store mStorage $ (Rec [] :: Object)
+
+ let gs = GlobalState {..}
+
+ let signalTimeouts = forever $ do
+ now <- getTime MonotonicRaw
+ next <- atomically $ do
+ writeTVar gNowVar now
+ readTVar gNextTimeout
+
+ let waitTill time
+ | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
+ | otherwise = threadDelay maxBound
+ waitForUpdate = atomically $ do
+ next' <- readTVar gNextTimeout
+ when (next' == next) retry
+
+ race_ (waitTill next) waitForUpdate
+
+ race_ signalTimeouts $ forever $ join $ atomically $
+ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+
+
+getConnection :: GlobalState addr -> addr -> STM (Connection addr)
+getConnection gs addr = do
+ maybe (newConnection gs addr) return =<< findConnection gs addr
+
+findConnection :: GlobalState addr -> addr -> STM (Maybe (Connection addr))
+findConnection GlobalState {..} addr = do
+ find ((addr==) . cAddress) <$> readTVar gConnections
+
+newConnection :: GlobalState addr -> addr -> STM (Connection addr)
+newConnection cGlobalState@GlobalState {..} addr = do
+ conns <- readTVar gConnections
+
+ let cAddress = addr
+ (cDataUp, cDataInternal) <- newFlow
+ cChannel <- newTVar ChannelNone
+ cCookie <- newTVar Nothing
+ cSecureOutQueue <- newTQueue
+ cMaxInFlightPackets <- newTVar 4
+ cReservedPackets <- newTVar 0
+ cSentPackets <- newTVar []
+ cToAcknowledge <- newTVar []
+ cInStreams <- newTVar []
+ cOutStreams <- newTVar []
+ let conn = Connection {..}
+
+ writeTVar gConnections (conn : conns)
+ return conn
+
+passUpIncoming :: GlobalState addr -> STM (IO ())
+passUpIncoming GlobalState {..} = do
+ (Connection {..}, up) <- takeTMVar gNextUp
+ writeFlow cDataInternal (Just up)
+ return $ return ()
+
+processIncoming :: GlobalState addr -> STM (IO ())
+processIncoming gs@GlobalState {..} = do
+ guard =<< isEmptyTMVar gNextUp
+ guard =<< canWriteFlow gControlFlow
+
+ (addr, msg) <- readFlow gDataFlow
+ mbconn <- findConnection gs addr
+
+ mbch <- case mbconn of
+ Nothing -> return Nothing
+ Just conn -> readTVar (cChannel conn) >>= return . \case
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
+
+ return $ do
+ let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict
+ let parse = case B.uncons msg of
+ Just (b, enc)
+ | b .&. 0xE0 == 0x80 -> do
+ ch <- maybe (throwError "unexpected encrypted packet") return mbch
+ (dec, counter) <- channelDecrypt ch enc
+
+ case B.uncons dec of
+ Just (0x00, content) -> do
+ objs <- deserialize content
+ return $ Left (True, objs, Just counter)
+
+ Just (snum, dec')
+ | snum < 64
+ , Just (seq8, content) <- B.uncons dec'
+ -> do
+ return $ Right (snum, seq8, content, counter)
+
+ Just (_, _) -> do
+ throwError "unexpected stream header"
+
+ Nothing -> do
+ throwError "empty decrypted content"
+
+ | b .&. 0xE0 == 0x60 -> do
+ objs <- deserialize msg
+ return $ Left (False, objs, Nothing)
+
+ | otherwise -> throwError "invalid packet"
+
+ Nothing -> throwError "empty packet"
+
+ runExceptT parse >>= \case
+ Right (Left (secure, objs, mbcounter))
+ | hobj:content <- objs
+ , Just header@(TransportHeader items) <- transportFromObject hobj
+ -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case
+ Just (conn@Connection {..}, mbup) -> do
+ ioAfter <- atomically $ do
+ case mbcounter of
+ Just counter | any isHeaderItemAcknowledged items ->
+ modifyTVar' cToAcknowledge (fromIntegral counter :)
+ _ -> return ()
+ case mbup of
+ Just up -> putTMVar gNextUp (conn, (secure, up))
+ Nothing -> return ()
+ processAcknowledgements gs conn items
+ ioAfter
+ Nothing -> return ()
+
+ | otherwise -> atomically $ do
+ gLog $ show addr ++ ": invalid objects"
+ gLog $ show objs
+
+ Right (Right (snum, seq8, content, counter))
+ | Just Connection {..} <- mbconn
+ -> atomically $ do
+ (lookup snum <$> readTVar cInStreams) >>= \case
+ Nothing ->
+ gLog $ "unexpected stream number " ++ show snum
+
+ Just Stream {..} -> do
+ expectedSequence <- readTVar sNextSequence
+ let seqFull = expectedSequence - 0x80 + fromIntegral (seq8 - fromIntegral expectedSequence + 0x80 :: Word8)
+ sdata <- if
+ | B.null content -> do
+ modifyTVar' cInStreams $ filter ((/=snum) . fst)
+ return $ StreamClosed seqFull
+ | otherwise -> do
+ writeTVar sNextSequence $ max expectedSequence (seqFull + 1)
+ return $ StreamData seqFull content
+ writeFlow sFlowIn sdata
+ modifyTVar' cToAcknowledge (fromIntegral counter :)
+
+ | otherwise -> do
+ atomically $ gLog $ show addr <> ": stream packet without connection"
+
+ Left err -> do
+ atomically $ gLog $ show addr <> ": failed to parse packet: " <> err
+
+processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a)))
+processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if
+ -- Established secure communication
+ | Right conn <- econn, secure
+ -> return $ Just (conn, Just packet)
+
+ -- Plaintext communication with cookies to prove origin
+ | cookie:_ <- mapMaybe (\case CookieEcho x -> Just x; _ -> Nothing) header
+ -> verifyCookie gs addr cookie >>= \case
+ True -> do
+ atomically $ do
+ conn@Connection {..} <- getConnection gs addr
+ oldCookie <- readTVar cCookie
+ let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header
+ case received `mplus` oldCookie of
+ Just current -> do
+ writeTVar cCookie (Just current)
+ cookieEchoReceived gs conn mbpid
+ return $ Just (conn, Just packet)
+ Nothing -> do
+ gLog $ show addr <> ": missing cookie set, dropping " <> show header
+ return $ Nothing
+
+ False -> do
+ atomically $ gLog $ show addr <> ": cookie verification failed, dropping " <> show header
+ return Nothing
+
+ -- Response to initiation packet
+ | cookie:_ <- mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header
+ , Just _ <- version
+ , Right conn@Connection {..} <- econn
+ -> do
+ atomically $ readTVar cChannel >>= \case
+ ChannelCookieWait -> do
+ writeTVar cChannel $ ChannelCookieReceived
+ writeTVar cCookie $ Just cookie
+ writeFlow gControlFlow (NewConnection conn mbpid)
+ return $ Just (conn, Nothing)
+ _ -> return Nothing
+
+ -- Initiation packet
+ | _:_ <- mapMaybe (\case Initiation x -> Just x; _ -> Nothing) header
+ , Just ver <- version
+ -> do
+ cookie <- createCookie gs addr
+ atomically $ do
+ identity <- fst <$> readTVar gIdentity
+ let reply = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader
+ [ CookieSet cookie
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
+ , ProtocolVersion ver
+ ]
+ writeFlow gDataFlow (addr, reply)
+ return Nothing
+
+ -- Announce packet outside any connection
+ | dgst:_ <- mapMaybe (\case AnnounceSelf x -> Just x; _ -> Nothing) header
+ , Just _ <- version
+ -> do
+ atomically $ do
+ (cur, past) <- readTVar gIdentity
+ when (not $ dgst `elem` map (refDigest . storedRef . idData) (cur : past)) $ do
+ writeFlow gControlFlow $ ReceivedAnnounce addr dgst
+ return Nothing
+
+ | otherwise -> do
+ atomically $ gLog $ show addr <> ": dropping packet " <> show header
+ return Nothing
+
+ where
+ addr = either id cAddress econn
+ mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header
+ version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions
+
+cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM ()
+cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do
+ readTVar cChannel >>= \case
+ ChannelNone -> newConn
+ ChannelCookieWait -> newConn
+ ChannelCookieReceived {} -> update
+ _ -> return ()
+ where
+ update = do
+ writeTVar cChannel ChannelCookieConfirmed
+ newConn = do
+ update
+ writeFlow gControlFlow (NewConnection conn mbpid)
+
+generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem]
+generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ]
+ where
+ echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie)
+ setHeader = case ch of
+ ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
+ ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
+ _ -> return Nothing
+
+createCookie :: GlobalState addr -> addr -> IO Cookie
+createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)
+
+verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool
+verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie
+
+
+reservePacket :: Connection addr -> STM ReservedToSend
+reservePacket conn@Connection {..} = do
+ maxPackets <- readTVar cMaxInFlightPackets
+ reserved <- readTVar cReservedPackets
+ sent <- length <$> readTVar cSentPackets
+
+ when (sent + reserved >= maxPackets) $ do
+ retry
+
+ writeTVar cReservedPackets $ reserved + 1
+ return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
+
+resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
+resendBytes Connection {..} reserved sp = do
+ let GlobalState {..} = cGlobalState
+ now <- getTime MonotonicRaw
+ atomically $ do
+ when (isJust reserved) $ do
+ modifyTVar' cReservedPackets (subtract 1)
+
+ when (isJust $ spAckedBy sp) $ do
+ modifyTVar' cSentPackets $ (:) sp
+ { spTime = now
+ , spRetryCount = spRetryCount sp + 1
+ }
+ writeFlow gDataFlow (cAddress, spData sp)
+
+sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()
+sendBytes conn reserved bs = resendBytes conn reserved
+ SentPacket
+ { spTime = undefined
+ , spRetryCount = -1
+ , spAckedBy = rsAckedBy =<< reserved
+ , spOnAck = maybe (return ()) rsOnAck reserved
+ , spOnFail = maybe (return ()) rsOnFail reserved
+ , spData = bs
+ }
+
+processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
+processOutgoing gs@GlobalState {..} = do
+
+ let sendNextPacket :: Connection addr -> STM (IO ())
+ sendNextPacket conn@Connection {..} = do
+ channel <- readTVar cChannel
+ let mbch = case channel of
+ ChannelEstablished ch -> Just ch
+ _ -> Nothing
+
+ let checkOutstanding
+ | isJust mbch = do
+ (,) <$> readTQueue cSecureOutQueue <*> (Just <$> reservePacket conn)
+ | otherwise = retry
+
+ checkDataInternal = do
+ (,) <$> readFlow cDataInternal <*> (Just <$> reservePacket conn)
+
+ checkAcknowledgements
+ | isJust mbch = do
+ acks <- readTVar cToAcknowledge
+ if null acks then retry
+ else return ((EncryptedOnly, TransportPacket (TransportHeader []) [], []), Nothing)
+ | otherwise = retry
+
+ ((secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy), mbReserved) <-
+ checkOutstanding <|> checkDataInternal <|> checkAcknowledgements
+
+ when (isNothing mbch && secure >= EncryptedOnly) $ do
+ writeTQueue cSecureOutQueue (secure, packet, plainAckedBy)
+
+ acknowledge <- case mbch of
+ Nothing -> return []
+ Just _ -> swapTVar cToAcknowledge []
+
+ return $ do
+ let onAck = sequence_ $ map (streamAccepted conn) $
+ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
+
+ let mkPlain extraHeaders =
+ let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
+ in BL.concat $
+ (serializeObject $ transportToObject gStorage header)
+ : map lazyLoadBytes content
+
+ let usePlaintext = do
+ plain <- mkPlain <$> generateCookieHeaders conn channel
+ return $ Just (BL.toStrict plain, plainAckedBy)
+
+ let useEncryption ch = do
+ plain <- mkPlain <$> return []
+ runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case
+ Right (ctext, counter) -> do
+ let isAcked = any isHeaderItemAcknowledged hitems
+ return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ return Nothing
+
+ mbs <- case (secure, mbch) of
+ (PlaintextOnly, _) -> usePlaintext
+ (PlaintextAllowed, Nothing) -> usePlaintext
+ (_, Just ch) -> useEncryption ch
+ (EncryptedOnly, Nothing) -> return Nothing
+
+ case mbs of
+ Just (bs, ackedBy) -> do
+ let mbReserved' = (\rs -> rs
+ { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy)
+ , rsOnAck = rsOnAck rs >> onAck
+ }) <$> mbReserved
+ sendBytes conn mbReserved' bs
+ Nothing -> return ()
+
+ let retransmitPacket :: Connection addr -> STM (IO ())
+ retransmitPacket conn@Connection {..} = do
+ now <- readTVar gNowVar
+ (sp, rest) <- readTVar cSentPackets >>= \case
+ sps@(_:_) -> return (last sps, init sps)
+ _ -> retry
+ let nextTry = spTime sp + fromNanoSecs 1000000000
+ if | now < nextTry -> do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || nextTry < nextTimeout
+ then do writeTVar gNextTimeout nextTry
+ return $ return ()
+ else retry
+ | spRetryCount sp < 2 -> do
+ reserved <- reservePacket conn
+ writeTVar cSentPackets rest
+ return $ resendBytes conn (Just reserved) sp
+ | otherwise -> do
+ return $ spOnFail sp
+
+ let handleControlRequests = readFlow gControlFlow >>= \case
+ RequestConnection addr -> do
+ conn@Connection {..} <- getConnection gs addr
+ identity <- fst <$> readTVar gIdentity
+ readTVar cChannel >>= \case
+ ChannelNone -> do
+ reserved <- reservePacket conn
+ let packet = BL.toStrict $ BL.concat
+ [ serializeObject $ transportToObject gStorage $ TransportHeader $
+ [ Initiation $ refDigest gInitConfig
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ] ++ map ProtocolVersion protocolVersions
+ , lazyLoadBytes gInitConfig
+ ]
+ writeTVar cChannel ChannelCookieWait
+ let reserved' = reserved { rsAckedBy = Just $ \case CookieSet {} -> True; _ -> False }
+ return $ sendBytes conn (Just reserved') packet
+ _ -> return $ return ()
+
+ SendAnnounce addr -> do
+ identity <- fst <$> readTVar gIdentity
+ let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $
+ [ AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ] ++ map ProtocolVersion protocolVersions
+ writeFlow gDataFlow (addr, packet)
+ return $ return ()
+
+ UpdateSelfIdentity nid -> do
+ (cur, past) <- readTVar gIdentity
+ writeTVar gIdentity (nid, cur : past)
+ return $ return ()
+
+ conns <- readTVar gConnections
+ msum $ concat $
+ [ map retransmitPacket conns
+ , map sendNextPacket conns
+ , [ handleControlRequests ]
+ ]
+
+processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ())
+processAcknowledgements GlobalState {} Connection {..} header = do
+ (acked, notAcked) <- partition (\sp -> any (fromJust (spAckedBy sp)) header) <$> readTVar cSentPackets
+ writeTVar cSentPackets notAcked
+ return $ sequence_ $ map spOnAck acked