summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Erebos/Network.hs82
-rw-r--r--src/Erebos/Network/Protocol.hs32
-rw-r--r--src/Erebos/Service.hs7
-rw-r--r--src/Erebos/Service/Stream.hs74
4 files changed, 157 insertions, 38 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 08f4e5c..0baeeb1 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -410,9 +410,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
@@ -527,9 +527,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -549,8 +547,8 @@ appendDistinct x (y:ys) | x == y = y : ys
appendDistinct x [] = [x]
handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
+ -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
@@ -684,10 +682,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -812,7 +811,7 @@ notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer
receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
@@ -888,19 +887,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -940,10 +969,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService = runPeerServiceOn Nothing []
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -968,6 +997,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index f5183b3..025f52c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -23,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -288,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go cGlobalState streamNumber stream
+ )
where
go gs@GlobalState {..} streamNumber stream = do
@@ -361,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
+
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -403,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do
modifyTVar' cOutStreams $ filter ((snum /=) . fst)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
@@ -425,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index 50bded6..9030d04 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -113,10 +113,13 @@ data ServiceInput s = ServiceInput
, svcPeerIdentity :: UnifiedIdentity
, svcServer :: Server
, svcPrintOp :: String -> IO ()
+ , svcNewStreams :: [ RawStreamReader ]
}
-data ServiceReply s = ServiceReply (Either s (Stored s)) Bool
- | ServiceFinally (IO ())
+data ServiceReply s
+ = ServiceReply (Either s (Stored s)) Bool
+ | ServiceOpenStream (RawStreamWriter -> IO ())
+ | ServiceFinally (IO ())
data ServiceHandlerState s = ServiceHandlerState
{ svcValue :: ServiceState s
diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs
new file mode 100644
index 0000000..67df4d7
--- /dev/null
+++ b/src/Erebos/Service/Stream.hs
@@ -0,0 +1,74 @@
+module Erebos.Service.Stream (
+ StreamPacket(..),
+ StreamReader, getStreamReaderNumber,
+ StreamWriter, getStreamWriterNumber,
+ openStream, receivedStreams,
+ readStreamPacket, writeStreamPacket,
+ writeStream,
+ closeStream,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.Reader
+import Control.Monad.Writer
+
+import Data.ByteString (ByteString)
+import Data.Word
+
+import Erebos.Flow
+import Erebos.Network
+import Erebos.Network.Protocol
+import Erebos.Service
+
+
+data StreamReader = StreamReader RawStreamReader
+
+getStreamReaderNumber :: StreamReader -> IO Int
+getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
+
+data StreamWriter = StreamWriter (MVar StreamWriterData)
+
+data StreamWriterData = StreamWriterData
+ { swdStream :: RawStreamWriter
+ , swdSequence :: Maybe Word64
+ }
+
+getStreamWriterNumber :: StreamWriter -> IO Int
+getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
+
+
+openStream :: Service s => ServiceHandler s StreamWriter
+openStream = do
+ mvar <- liftIO newEmptyMVar
+ tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
+ return $ StreamWriter mvar
+
+receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
+receivedStreams = do
+ map StreamReader <$> asks svcNewStreams
+
+readStreamPacket :: StreamReader -> IO StreamPacket
+readStreamPacket (StreamReader stream) = do
+ readFlowIO (rsrFlow stream)
+
+writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
+writeStreamPacket (StreamWriter mvar) packet = do
+ withMVar mvar $ \swd -> do
+ writeFlowIO (rswFlow $ swdStream swd) packet
+
+writeStream :: StreamWriter -> ByteString -> IO ()
+writeStream (StreamWriter mvar) bytes = do
+ modifyMVar_ mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> do
+ writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
+ return swd { swdSequence = Just (seqNum + 1) }
+ Nothing -> do
+ fail "writeStream: stream closed"
+
+closeStream :: StreamWriter -> IO ()
+closeStream (StreamWriter mvar) = do
+ withMVar mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
+ Nothing -> fail "closeStream: stream already closed"