summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-07-17 20:49:31 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-07-17 20:58:28 +0200
commit79655a00f7d7a95d9954947df50a8b772af8e703 (patch)
treeb20ffafd8deabbd339503d5620804f25e5236752
parentdc8954b9ef7e0e230707777c1f640dd15e79dcd0 (diff)
Merge multiple data responses into a single stream
Changelog: Fix sending multiple data responses in a stream
-rw-r--r--main/Test.hs6
-rw-r--r--src/Erebos/Network.hs45
-rw-r--r--test/network.test56
3 files changed, 91 insertions, 16 deletions
diff --git a/main/Test.hs b/main/Test.hs
index 6b6d1d4..97eaee7 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -501,11 +501,11 @@ cmdPeerList = do
cmdTestMessageSend :: Command
cmdTestMessageSend = do
- [spidx, tref] <- asks tiParams
+ spidx : trefs <- asks tiParams
st <- asks tiStorage
- Just ref <- liftIO $ readRef st (encodeUtf8 tref)
+ Just refs <- liftIO $ fmap sequence $ mapM (readRef st . encodeUtf8) trefs
peer <- getPeer spidx
- sendToPeer peer $ TestMessage $ wrappedLoad ref
+ sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
cmdSharedStateGet :: Command
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 3896829..402e163 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -19,7 +19,9 @@ module Erebos.Network (
#endif
dropPeer,
isPeerDropped,
- sendToPeer, sendToPeerStored, sendToPeerWith,
+ sendToPeer, sendManyToPeer,
+ sendToPeerStored, sendManyToPeerStored,
+ sendToPeerWith,
runPeerService,
discoveryPort,
@@ -422,12 +424,18 @@ instance MonadFail PacketHandler where
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler secure peer@Peer {..} act = do
let logd = writeTQueue $ serverErrorLog peerServer_
- runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case
+ runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case
Left err -> do
logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err
Right ph -> do
when (not $ null $ phHead ph) $ do
- let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ body <- case phBodyStream ph of
+ Nothing -> return $ phBody ph
+ Just stream -> do
+ writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph
+ return []
+ let packet = TransportPacket (TransportHeader $ phHead ph) body
secreq = case (secure, phPlaintextReply ph) of
(True, _) -> EncryptedOnly
(False, False) -> PlaintextAllowed
@@ -451,6 +459,7 @@ data PacketHandlerState = PacketHandlerState
, phHead :: [TransportHeaderItem]
, phAckedBy :: [TransportHeaderItem]
, phBody :: [Ref]
+ , phBodyStream :: Maybe RawStreamWriter
, phPlaintextReply :: Bool
}
@@ -463,6 +472,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+sendBodyAsStream :: PacketHandler ()
+sendBodyAsStream = do
+ gets phBodyStream >>= \case
+ Nothing -> do
+ stream <- openStream
+ modify $ \ph -> ph { phBodyStream = Just stream }
+ Just _ -> return ()
+
keepPlaintextReply :: PacketHandler ()
keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
@@ -533,15 +550,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
-- otherwise lost the channel, so keep the reply plaintext as well.
when (not secure) keepPlaintextReply
- let bytes = lazyLoadBytes mref
-- TODO: MTU
- if (secure && BL.length bytes > 500)
- then do
- stream <- openStream
- liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
- writeByteStringToStream stream bytes
- else do
- addBody $ mref
+ when (secure && BL.length (lazyLoadBytes mref) > 500)
+ sendBodyAsStream
+
+ addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show dgst
addHeader $ Rejected dgst
@@ -807,10 +820,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
_ -> return False
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
-sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
+sendToPeer peer = sendManyToPeer peer . (: [])
+
+sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m ()
+sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True)
sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
-sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True]
+sendToPeerStored peer = sendManyToPeerStored peer . (: [])
+
+sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
+sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do
diff --git a/test/network.test b/test/network.test
index 9540bf6..efd508f 100644
--- a/test/network.test
+++ b/test/network.test
@@ -178,6 +178,62 @@ test ManyStreams:
expect /test-message-received blob 100[2-4] $ref/ from p2
+test MultipleServiceRefs:
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server" to p1
+ send "start-server" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ let kbytes = 2
+
+ with p1:
+ send "store blob"
+ send "A"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_a
+
+ # Create blobs with (kbytes * 1000) bytes each
+
+ send "store blob"
+ send "B"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_b
+
+ send "store blob"
+ send "C"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_c
+
+ send "store blob"
+ send "D"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_d
+
+ send "test-message-send 1 $ref_a $ref_b $ref_c $ref_d"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $ref_a/ from p2
+ expect /test-message-received blob [0-9]+ $ref_b/ from p2
+ expect /test-message-received blob [0-9]+ $ref_c/ from p2
+ expect /test-message-received blob [0-9]+ $ref_d/ from p2
+
+
test Reconnection:
spawn as p1
with p1: