diff options
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 32 |
1 files changed, 22 insertions, 10 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index f5183b3..025f52c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -23,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -288,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go cGlobalState streamNumber stream + ) where go gs@GlobalState {..} streamNumber stream = do @@ -361,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } + +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -403,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do modifyTVar' cOutStreams $ filter ((snum /=) . fst) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) @@ -425,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest |