From e0a5dbf7164517c79940da5691745cd281e8557e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 2 Mar 2024 21:01:37 +0100 Subject: Network streams Changelog: Implemented streams in network protocol --- src/Erebos/Network.hs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) (limited to 'src/Erebos/Network.hs') diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index b26ada5..7c6a61e 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -30,7 +30,8 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State -import qualified Data.ByteString.Char8 as BC +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List @@ -179,6 +180,11 @@ lookupServiceType (ServiceType stype : _) = Just stype lookupServiceType (_ : hs) = lookupServiceType hs lookupServiceType [] = Nothing +lookupNewStreams :: [TransportHeaderItem] -> [Word8] +lookupNewStreams (StreamOpen num : rest) = num : lookupNewStreams rest +lookupNewStreams (_ : rest) = lookupNewStreams rest +lookupNewStreams [] = [] + newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef newWaitingRef dgst act = do @@ -421,6 +427,25 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +openStream :: PacketHandler RawStreamWriter +openStream = do + Peer {..} <- gets phPeer + conn <- readTVarP peerConnection >>= \case + Right conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftSTM $ connAddWriteStream conn + liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) + addHeader hdr + return writer + +acceptStream :: Word8 -> PacketHandler RawStreamReader +acceptStream streamNumber = do + Peer {..} <- gets phPeer + conn <- readTVarP peerConnection >>= \case + Right conn -> return conn + _ -> throwError "can't accept stream without established connection" + liftSTM $ connAddReadStream conn streamNumber + appendDistinct :: Eq a => a -> [a] -> [a] appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys @@ -461,7 +486,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = partialRefFromDigest (peerInStorage peer) dgst addHeader $ DataResponse dgst addAckedBy [ Acknowledged dgst, Rejected dgst ] - addBody $ mref + let bytes = lazyLoadBytes mref + -- TODO: MTU + if (secure && BL.length bytes > 500) + then do + stream <- openStream + liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream bytes + else do + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -471,6 +504,18 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = when (not secure) $ do addHeader $ Acknowledged dgst liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) + + | streamNumber : _ <- lookupNewStreams headers -> do + streamReader <- acceptStream streamNumber + liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do + (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case + Left err -> atomically $ writeTQueue (serverErrorLog server) $ + "failed to receive object from stream: " <> err + Right objs -> do + forM_ objs $ \obj -> do + pref <- storeObject (peerInStorage peer) obj + atomically $ writeTQueue (serverDataResponse server) (peer, Just pref) + | otherwise -> throwError $ "mismatched data response " ++ show dgst AnnounceSelf dgst @@ -708,7 +753,7 @@ sendToPeerList peer parts = do ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) ServiceFinally act -> act >> return Nothing let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter snd srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) packet = TransportPacket header content ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] -- cgit v1.2.3