summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-07-17 20:49:31 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-07-17 20:58:28 +0200
commit79655a00f7d7a95d9954947df50a8b772af8e703 (patch)
treeb20ffafd8deabbd339503d5620804f25e5236752 /src
parentdc8954b9ef7e0e230707777c1f640dd15e79dcd0 (diff)
Merge multiple data responses into a single stream
Changelog: Fix sending multiple data responses in a stream
Diffstat (limited to 'src')
-rw-r--r--src/Erebos/Network.hs45
1 files changed, 32 insertions, 13 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 3896829..402e163 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -19,7 +19,9 @@ module Erebos.Network (
#endif
dropPeer,
isPeerDropped,
- sendToPeer, sendToPeerStored, sendToPeerWith,
+ sendToPeer, sendManyToPeer,
+ sendToPeerStored, sendManyToPeerStored,
+ sendToPeerWith,
runPeerService,
discoveryPort,
@@ -422,12 +424,18 @@ instance MonadFail PacketHandler where
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler secure peer@Peer {..} act = do
let logd = writeTQueue $ serverErrorLog peerServer_
- runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case
+ runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case
Left err -> do
logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err
Right ph -> do
when (not $ null $ phHead ph) $ do
- let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ body <- case phBodyStream ph of
+ Nothing -> return $ phBody ph
+ Just stream -> do
+ writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph
+ return []
+ let packet = TransportPacket (TransportHeader $ phHead ph) body
secreq = case (secure, phPlaintextReply ph) of
(True, _) -> EncryptedOnly
(False, False) -> PlaintextAllowed
@@ -451,6 +459,7 @@ data PacketHandlerState = PacketHandlerState
, phHead :: [TransportHeaderItem]
, phAckedBy :: [TransportHeaderItem]
, phBody :: [Ref]
+ , phBodyStream :: Maybe RawStreamWriter
, phPlaintextReply :: Bool
}
@@ -463,6 +472,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+sendBodyAsStream :: PacketHandler ()
+sendBodyAsStream = do
+ gets phBodyStream >>= \case
+ Nothing -> do
+ stream <- openStream
+ modify $ \ph -> ph { phBodyStream = Just stream }
+ Just _ -> return ()
+
keepPlaintextReply :: PacketHandler ()
keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
@@ -533,15 +550,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
-- otherwise lost the channel, so keep the reply plaintext as well.
when (not secure) keepPlaintextReply
- let bytes = lazyLoadBytes mref
-- TODO: MTU
- if (secure && BL.length bytes > 500)
- then do
- stream <- openStream
- liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
- writeByteStringToStream stream bytes
- else do
- addBody $ mref
+ when (secure && BL.length (lazyLoadBytes mref) > 500)
+ sendBodyAsStream
+
+ addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show dgst
addHeader $ Rejected dgst
@@ -807,10 +820,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
_ -> return False
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
-sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
+sendToPeer peer = sendManyToPeer peer . (: [])
+
+sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m ()
+sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True)
sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
-sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True]
+sendToPeerStored peer = sendManyToPeerStored peer . (: [])
+
+sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
+sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do