summaryrefslogtreecommitdiff
path: root/src/Erebos/Network/Protocol.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r--src/Erebos/Network/Protocol.hs69
1 files changed, 52 insertions, 17 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index ac38588..f67e296 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -3,6 +3,7 @@ module Erebos.Network.Protocol (
transportToObject,
TransportHeader(..),
TransportHeaderItem(..),
+ ServiceID(..),
SecurityRequirement(..),
WaitingRef(..),
@@ -22,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -69,9 +71,9 @@ import Erebos.Flow
import Erebos.Identity
import Erebos.Network.Channel
import Erebos.Object
-import Erebos.Service
import Erebos.Storable
import Erebos.Storage
+import Erebos.UUID (UUID)
protocolVersion :: Text
@@ -108,6 +110,9 @@ data TransportHeaderItem
| StreamOpen Word8
deriving (Eq, Show)
+newtype ServiceID = ServiceID UUID
+ deriving (Eq, Ord, Show, StorableUUID)
+
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
@@ -208,6 +213,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
, gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)
, gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
, gLog :: String -> STM ()
+ , gTestLog :: String -> STM ()
, gStorage :: PartialStorage
, gStartTime :: TimeSpec
, gNowVar :: TVar TimeSpec
@@ -244,6 +250,12 @@ instance Eq (Connection addr) where
connAddress :: Connection addr -> addr
connAddress = cAddress
+showConnAddress :: forall addr. Connection addr -> String
+showConnAddress Connection {..} = helper cGlobalState cAddress
+ where
+ helper :: GlobalState addr -> addr -> String
+ helper GlobalState {} = show
+
connData :: Connection addr -> Flow
(Maybe (Bool, TransportPacket PartialObject))
(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
@@ -268,6 +280,7 @@ connClose conn@Connection {..} = do
connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
+ let GlobalState {..} = cGlobalState
outStreams <- readTVar cOutStreams
let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)])
doInsert n (s@(n', _) : rest) | n == n' =
@@ -284,10 +297,16 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams')
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go streamNumber stream
+ )
where
- go gs@GlobalState {..} streamNumber stream = do
+ go streamNumber stream = do
+ let GlobalState {..} = cGlobalState
(reserved, msg) <- atomically $ do
readTVar (sState stream) >>= \case
StreamRunning -> return ()
@@ -300,6 +319,8 @@ connAddWriteStream conn@Connection {..} = do
return (stpData, True, return ())
StreamClosed {} -> do
atomically $ do
+ gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber
+ atomically $ do
-- wait for ack on all sent stream data
waits <- readTVar (sWaitingForAck stream)
when (waits > 0) retry
@@ -343,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
- when cont $ go gs streamNumber stream
+ when cont $ go streamNumber stream
connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader
connAddReadStream Connection {..} streamNumber = do
@@ -357,14 +378,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
+
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -395,11 +423,13 @@ streamAccepted Connection {..} snum = atomically $ do
Nothing -> return ()
streamClosed :: Connection addr -> Word8 -> IO ()
-streamClosed Connection {..} snum = atomically $ do
- modifyTVar' cOutStreams $ filter ((snum /=) . fst)
+streamClosed conn@Connection {..} snum = atomically $ do
+ streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams
+ writeTVar cOutStreams streams
+ gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
@@ -421,10 +451,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
@@ -478,10 +508,11 @@ data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest)
erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
=> UnifiedIdentity
-> (String -> STM ())
+ -> (String -> STM ())
-> SymFlow (addr, ByteString)
-> Flow (ControlRequest addr) (ControlMessage addr)
-> IO ()
-erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
+erebosNetworkProtocol initialIdentity gLog gTestLog gDataFlow gControlFlow = do
gIdentity <- newTVarIO (initialIdentity, [])
gConnections <- newTVarIO []
gNextUp <- newEmptyTMVarIO
@@ -545,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
cOutStreams <- newTVar []
let conn = Connection {..}
+ gTestLog $ "net-conn-new " <> show cAddress
writeTVar gConnections (conn : conns)
return conn
@@ -901,7 +933,10 @@ processOutgoing gs@GlobalState {..} = do
, rsOnAck = rsOnAck rs >> onAck
}) <$> mbReserved
sendBytes conn mbReserved' bs
- Nothing -> return ()
+ Nothing -> do
+ when (isJust mbReserved) $ do
+ atomically $ do
+ modifyTVar' cReservedPackets (subtract 1)
let waitUntil :: TimeSpec -> TimeSpec -> STM ()
waitUntil now till = do