diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-11-16 20:25:47 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-11-18 21:06:44 +0100 |
commit | 12497ed32f70a23552fd35161138b2e1812fc4f0 (patch) | |
tree | a3b2085ceacc56cfd7f553f42244a05bd9141e9d /src/network.cpp | |
parent | 47bd6afb103c5ddfb4a878e95416793610ed1be3 (diff) |
Network: use streams to send large objectsdevel
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/src/network.cpp b/src/network.cpp index 26a07e3..8455eea 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -252,7 +252,13 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const NetworkProtocol::Header::ServiceType { uuid }, NetworkProtocol::Header::ServiceRef { ref.digest() }, }); - speer->connection.send(speer->partStorage, move(header), { obj }, true); + + vector< Object > body; + if( obj.encode().size() + 2 * NetworkProtocol::Header::itemSize + <= speer->connection.mtu() ) + body.push_back( obj ); + + speer->connection.send( speer->partStorage, move(header), body, true ); return true; } @@ -454,7 +460,15 @@ void Server::Priv::doListen() } peer->connection.send(peer->partStorage, - NetworkProtocol::Header(reply.header()), reply.body(), false); + NetworkProtocol::Header( reply.header() ), + reply.stream() ? vector< Object >{} : reply.body(), false ); + if( reply.stream() ){ + for( const auto & obj : reply.body() ) { + auto part = obj.encode(); + reply.stream()->write( part.data(), part.size() ); + } + reply.stream()->close(); + } } peer->connection.trySendOutQueue(); @@ -597,6 +611,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head if (auto ref = peer.tempStorage.ref(dgst)) { reply.header({ NetworkProtocol::Header::DataResponse { ref->digest() } }); reply.body(*ref); + + if( holds_alternative< unique_ptr< Channel >>( peer.connection.channel() ) and + reply.size() > peer.connection.mtu() and not reply.stream() ) + reply.stream( peer.connection.openOutStream() ); } } } @@ -907,9 +925,17 @@ void ReplyBuilder::body(const Ref & ref) for (const auto & x : mbody) if (x.digest() == ref.digest()) return; + + bodySize += ref->encode().size(); mbody.push_back(ref); } +void ReplyBuilder::stream( shared_ptr< NetworkProtocol::OutStream > s ) +{ + mheader.emplace_back( Header::StreamOpen{ s->id }); + mstream = move( s ); +} + vector<Object> ReplyBuilder::body() const { vector<Object> res; @@ -919,6 +945,11 @@ vector<Object> ReplyBuilder::body() const return res; } +size_t ReplyBuilder::size() const +{ + return mheader.size() * Header::itemSize + bodySize; +} + optional<Ref> WaitingRef::check() { |