summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs82
1 files changed, 56 insertions, 26 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 08f4e5c..0baeeb1 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -410,9 +410,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
@@ -527,9 +527,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -549,8 +547,8 @@ appendDistinct x (y:ys) | x == y = y : ys
appendDistinct x [] = [x]
handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
+ -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
@@ -684,10 +682,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -812,7 +811,7 @@ notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer
receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
@@ -888,19 +887,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -940,10 +969,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService = runPeerServiceOn Nothing []
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -968,6 +997,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do