diff options
-rw-r--r-- | main/Test.hs | 6 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 45 | ||||
-rw-r--r-- | test/network.test | 56 |
3 files changed, 91 insertions, 16 deletions
diff --git a/main/Test.hs b/main/Test.hs index 6b6d1d4..97eaee7 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -501,11 +501,11 @@ cmdPeerList = do cmdTestMessageSend :: Command cmdTestMessageSend = do - [spidx, tref] <- asks tiParams + spidx : trefs <- asks tiParams st <- asks tiStorage - Just ref <- liftIO $ readRef st (encodeUtf8 tref) + Just refs <- liftIO $ fmap sequence $ mapM (readRef st . encodeUtf8) trefs peer <- getPeer spidx - sendToPeer peer $ TestMessage $ wrappedLoad ref + sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" cmdSharedStateGet :: Command diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 3896829..402e163 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -19,7 +19,9 @@ module Erebos.Network ( #endif dropPeer, isPeerDropped, - sendToPeer, sendToPeerStored, sendToPeerWith, + sendToPeer, sendManyToPeer, + sendToPeerStored, sendManyToPeerStored, + sendToPeerWith, runPeerService, discoveryPort, @@ -422,12 +424,18 @@ instance MonadFail PacketHandler where runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () runPacketHandler secure peer@Peer {..} act = do let logd = writeTQueue $ serverErrorLog peerServer_ - runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case + runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case Left err -> do logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err Right ph -> do when (not $ null $ phHead ph) $ do - let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + body <- case phBodyStream ph of + Nothing -> return $ phBody ph + Just stream -> do + writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph + return [] + let packet = TransportPacket (TransportHeader $ phHead ph) body secreq = case (secure, phPlaintextReply ph) of (True, _) -> EncryptedOnly (False, False) -> PlaintextAllowed @@ -451,6 +459,7 @@ data PacketHandlerState = PacketHandlerState , phHead :: [TransportHeaderItem] , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] + , phBodyStream :: Maybe RawStreamWriter , phPlaintextReply :: Bool } @@ -463,6 +472,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +sendBodyAsStream :: PacketHandler () +sendBodyAsStream = do + gets phBodyStream >>= \case + Nothing -> do + stream <- openStream + modify $ \ph -> ph { phBodyStream = Just stream } + Just _ -> return () + keepPlaintextReply :: PacketHandler () keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } @@ -533,15 +550,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = -- otherwise lost the channel, so keep the reply plaintext as well. when (not secure) keepPlaintextReply - let bytes = lazyLoadBytes mref -- TODO: MTU - if (secure && BL.length bytes > 500) - then do - stream <- openStream - liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do - writeByteStringToStream stream bytes - else do - addBody $ mref + when (secure && BL.length (lazyLoadBytes mref) > 500) + sendBodyAsStream + + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -807,10 +820,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () -sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] +sendToPeer peer = sendManyToPeer peer . (: []) + +sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m () +sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True) sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () -sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] +sendToPeerStored peer = sendManyToPeerStored peer . (: []) + +sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () +sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do diff --git a/test/network.test b/test/network.test index 9540bf6..efd508f 100644 --- a/test/network.test +++ b/test/network.test @@ -178,6 +178,62 @@ test ManyStreams: expect /test-message-received blob 100[2-4] $ref/ from p2 +test MultipleServiceRefs: + spawn as p1 + spawn as p2 + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server" to p1 + send "start-server" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + let kbytes = 2 + + with p1: + send "store blob" + send "A" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_a + + # Create blobs with (kbytes * 1000) bytes each + + send "store blob" + send "B" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_b + + send "store blob" + send "C" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_c + + send "store blob" + send "D" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_d + + send "test-message-send 1 $ref_a $ref_b $ref_c $ref_d" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $ref_a/ from p2 + expect /test-message-received blob [0-9]+ $ref_b/ from p2 + expect /test-message-received blob [0-9]+ $ref_c/ from p2 + expect /test-message-received blob [0-9]+ $ref_d/ from p2 + + test Reconnection: spawn as p1 with p1: |