summaryrefslogtreecommitdiff
path: root/src/Erebos/Service/Stream.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-05-25 21:37:40 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2025-05-31 17:09:53 +0200
commitd9800045d572358526bf18688f06a4cfa4f99772 (patch)
tree4f415d7d2c6f5b03889c56320140fc1ffd3df08b /src/Erebos/Service/Stream.hs
parentcd766373e829de6e877f46458bab82a355092972 (diff)
Network streams for services
Diffstat (limited to 'src/Erebos/Service/Stream.hs')
-rw-r--r--src/Erebos/Service/Stream.hs74
1 files changed, 74 insertions, 0 deletions
diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs
new file mode 100644
index 0000000..67df4d7
--- /dev/null
+++ b/src/Erebos/Service/Stream.hs
@@ -0,0 +1,74 @@
+module Erebos.Service.Stream (
+ StreamPacket(..),
+ StreamReader, getStreamReaderNumber,
+ StreamWriter, getStreamWriterNumber,
+ openStream, receivedStreams,
+ readStreamPacket, writeStreamPacket,
+ writeStream,
+ closeStream,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.Reader
+import Control.Monad.Writer
+
+import Data.ByteString (ByteString)
+import Data.Word
+
+import Erebos.Flow
+import Erebos.Network
+import Erebos.Network.Protocol
+import Erebos.Service
+
+
+data StreamReader = StreamReader RawStreamReader
+
+getStreamReaderNumber :: StreamReader -> IO Int
+getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
+
+data StreamWriter = StreamWriter (MVar StreamWriterData)
+
+data StreamWriterData = StreamWriterData
+ { swdStream :: RawStreamWriter
+ , swdSequence :: Maybe Word64
+ }
+
+getStreamWriterNumber :: StreamWriter -> IO Int
+getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
+
+
+openStream :: Service s => ServiceHandler s StreamWriter
+openStream = do
+ mvar <- liftIO newEmptyMVar
+ tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
+ return $ StreamWriter mvar
+
+receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
+receivedStreams = do
+ map StreamReader <$> asks svcNewStreams
+
+readStreamPacket :: StreamReader -> IO StreamPacket
+readStreamPacket (StreamReader stream) = do
+ readFlowIO (rsrFlow stream)
+
+writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
+writeStreamPacket (StreamWriter mvar) packet = do
+ withMVar mvar $ \swd -> do
+ writeFlowIO (rswFlow $ swdStream swd) packet
+
+writeStream :: StreamWriter -> ByteString -> IO ()
+writeStream (StreamWriter mvar) bytes = do
+ modifyMVar_ mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> do
+ writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
+ return swd { swdSequence = Just (seqNum + 1) }
+ Nothing -> do
+ fail "writeStream: stream closed"
+
+closeStream :: StreamWriter -> IO ()
+closeStream (StreamWriter mvar) = do
+ withMVar mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
+ Nothing -> fail "closeStream: stream already closed"