summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-11-16 20:25:47 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-17 21:51:27 +0100
commita514bb99355bafa3e3ee323d1451c1e7c2d8ca74 (patch)
tree3749adcb1cb3e8335aad3d53f5cd59081bad2178 /src/network.cpp
parent47bd6afb103c5ddfb4a878e95416793610ed1be3 (diff)
Network: use streams to send large objects
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp36
1 files changed, 34 insertions, 2 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 26a07e3..22b292a 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -252,7 +252,13 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
NetworkProtocol::Header::ServiceType { uuid },
NetworkProtocol::Header::ServiceRef { ref.digest() },
});
- speer->connection.send(speer->partStorage, move(header), { obj }, true);
+
+ vector< Object > body;
+ if( obj.encode().size() + 2 * NetworkProtocol::Header::itemSize
+ <= speer->connection.mtu() )
+ body.push_back( obj );
+
+ speer->connection.send( speer->partStorage, move(header), body, true );
return true;
}
@@ -442,6 +448,7 @@ void Server::Priv::doListen()
peer->updateChannel( reply );
} else {
peer->checkDataResponseStreams( reply );
+ peer->updateIdentity( reply, notifyPeers );
}
peer->updateService( reply, readyServices );
@@ -454,7 +461,15 @@ void Server::Priv::doListen()
}
peer->connection.send(peer->partStorage,
- NetworkProtocol::Header(reply.header()), reply.body(), false);
+ NetworkProtocol::Header( reply.header() ),
+ reply.stream() ? vector< Object >{} : reply.body(), false );
+ if( reply.stream() ){
+ for( const auto & obj : reply.body() ) {
+ auto part = obj.encode();
+ reply.stream()->write( part.data(), part.size() );
+ }
+ reply.stream()->close();
+ }
}
peer->connection.trySendOutQueue();
@@ -597,6 +612,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
if (auto ref = peer.tempStorage.ref(dgst)) {
reply.header({ NetworkProtocol::Header::DataResponse { ref->digest() } });
reply.body(*ref);
+
+ if( holds_alternative< unique_ptr< Channel >>( peer.connection.channel() ) and
+ reply.size() > peer.connection.mtu() and not reply.stream() )
+ reply.stream( peer.connection.openOutStream() );
}
}
}
@@ -907,9 +926,17 @@ void ReplyBuilder::body(const Ref & ref)
for (const auto & x : mbody)
if (x.digest() == ref.digest())
return;
+
+ bodySize += ref->encode().size();
mbody.push_back(ref);
}
+void ReplyBuilder::stream( shared_ptr< NetworkProtocol::OutStream > s )
+{
+ mheader.emplace_back( Header::StreamOpen{ s->id });
+ mstream = move( s );
+}
+
vector<Object> ReplyBuilder::body() const
{
vector<Object> res;
@@ -919,6 +946,11 @@ vector<Object> ReplyBuilder::body() const
return res;
}
+size_t ReplyBuilder::size() const
+{
+ return mheader.size() * Header::itemSize + bodySize;
+}
+
optional<Ref> WaitingRef::check()
{