summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs51
1 files changed, 48 insertions, 3 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index b26ada5..7c6a61e 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -30,7 +30,8 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
-import qualified Data.ByteString.Char8 as BC
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
@@ -179,6 +180,11 @@ lookupServiceType (ServiceType stype : _) = Just stype
lookupServiceType (_ : hs) = lookupServiceType hs
lookupServiceType [] = Nothing
+lookupNewStreams :: [TransportHeaderItem] -> [Word8]
+lookupNewStreams (StreamOpen num : rest) = num : lookupNewStreams rest
+lookupNewStreams (_ : rest) = lookupNewStreams rest
+lookupNewStreams [] = []
+
newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
newWaitingRef dgst act = do
@@ -421,6 +427,25 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+openStream :: PacketHandler RawStreamWriter
+openStream = do
+ Peer {..} <- gets phPeer
+ conn <- readTVarP peerConnection >>= \case
+ Right conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftSTM $ connAddWriteStream conn
+ liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
+ addHeader hdr
+ return writer
+
+acceptStream :: Word8 -> PacketHandler RawStreamReader
+acceptStream streamNumber = do
+ Peer {..} <- gets phPeer
+ conn <- readTVarP peerConnection >>= \case
+ Right conn -> return conn
+ _ -> throwError "can't accept stream without established connection"
+ liftSTM $ connAddReadStream conn streamNumber
+
appendDistinct :: Eq a => a -> [a] -> [a]
appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
@@ -461,7 +486,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
partialRefFromDigest (peerInStorage peer) dgst
addHeader $ DataResponse dgst
addAckedBy [ Acknowledged dgst, Rejected dgst ]
- addBody $ mref
+ let bytes = lazyLoadBytes mref
+ -- TODO: MTU
+ if (secure && BL.length bytes > 500)
+ then do
+ stream <- openStream
+ liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream bytes
+ else do
+ addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show dgst
addHeader $ Rejected dgst
@@ -471,6 +504,18 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
when (not secure) $ do
addHeader $ Acknowledged dgst
liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)
+
+ | streamNumber : _ <- lookupNewStreams headers -> do
+ streamReader <- acceptStream streamNumber
+ liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
+ (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case
+ Left err -> atomically $ writeTQueue (serverErrorLog server) $
+ "failed to receive object from stream: " <> err
+ Right objs -> do
+ forM_ objs $ \obj -> do
+ pref <- storeObject (peerInStorage peer) obj
+ atomically $ writeTQueue (serverDataResponse server) (peer, Just pref)
+
| otherwise -> throwError $ "mismatched data response " ++ show dgst
AnnounceSelf dgst
@@ -708,7 +753,7 @@ sendToPeerList peer parts = do
ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
ServiceFinally act -> act >> return Nothing
let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter snd srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
packet = TransportPacket header content
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]