blob: 67df4d7c4b02c4dc7604d20bc995fc3d2f932b04 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
module Erebos.Service.Stream (
StreamPacket(..),
StreamReader, getStreamReaderNumber,
StreamWriter, getStreamWriterNumber,
openStream, receivedStreams,
readStreamPacket, writeStreamPacket,
writeStream,
closeStream,
) where
import Control.Concurrent.MVar
import Control.Monad.Reader
import Control.Monad.Writer
import Data.ByteString (ByteString)
import Data.Word
import Erebos.Flow
import Erebos.Network
import Erebos.Network.Protocol
import Erebos.Service
data StreamReader = StreamReader RawStreamReader
getStreamReaderNumber :: StreamReader -> IO Int
getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
data StreamWriter = StreamWriter (MVar StreamWriterData)
data StreamWriterData = StreamWriterData
{ swdStream :: RawStreamWriter
, swdSequence :: Maybe Word64
}
getStreamWriterNumber :: StreamWriter -> IO Int
getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
openStream :: Service s => ServiceHandler s StreamWriter
openStream = do
mvar <- liftIO newEmptyMVar
tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
return $ StreamWriter mvar
receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
receivedStreams = do
map StreamReader <$> asks svcNewStreams
readStreamPacket :: StreamReader -> IO StreamPacket
readStreamPacket (StreamReader stream) = do
readFlowIO (rsrFlow stream)
writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
writeStreamPacket (StreamWriter mvar) packet = do
withMVar mvar $ \swd -> do
writeFlowIO (rswFlow $ swdStream swd) packet
writeStream :: StreamWriter -> ByteString -> IO ()
writeStream (StreamWriter mvar) bytes = do
modifyMVar_ mvar $ \swd -> do
case swdSequence swd of
Just seqNum -> do
writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
return swd { swdSequence = Just (seqNum + 1) }
Nothing -> do
fail "writeStream: stream closed"
closeStream :: StreamWriter -> IO ()
closeStream (StreamWriter mvar) = do
withMVar mvar $ \swd -> do
case swdSequence swd of
Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
Nothing -> fail "closeStream: stream already closed"
|