summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp35
1 files changed, 33 insertions, 2 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 26a07e3..8455eea 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -252,7 +252,13 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
NetworkProtocol::Header::ServiceType { uuid },
NetworkProtocol::Header::ServiceRef { ref.digest() },
});
- speer->connection.send(speer->partStorage, move(header), { obj }, true);
+
+ vector< Object > body;
+ if( obj.encode().size() + 2 * NetworkProtocol::Header::itemSize
+ <= speer->connection.mtu() )
+ body.push_back( obj );
+
+ speer->connection.send( speer->partStorage, move(header), body, true );
return true;
}
@@ -454,7 +460,15 @@ void Server::Priv::doListen()
}
peer->connection.send(peer->partStorage,
- NetworkProtocol::Header(reply.header()), reply.body(), false);
+ NetworkProtocol::Header( reply.header() ),
+ reply.stream() ? vector< Object >{} : reply.body(), false );
+ if( reply.stream() ){
+ for( const auto & obj : reply.body() ) {
+ auto part = obj.encode();
+ reply.stream()->write( part.data(), part.size() );
+ }
+ reply.stream()->close();
+ }
}
peer->connection.trySendOutQueue();
@@ -597,6 +611,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
if (auto ref = peer.tempStorage.ref(dgst)) {
reply.header({ NetworkProtocol::Header::DataResponse { ref->digest() } });
reply.body(*ref);
+
+ if( holds_alternative< unique_ptr< Channel >>( peer.connection.channel() ) and
+ reply.size() > peer.connection.mtu() and not reply.stream() )
+ reply.stream( peer.connection.openOutStream() );
}
}
}
@@ -907,9 +925,17 @@ void ReplyBuilder::body(const Ref & ref)
for (const auto & x : mbody)
if (x.digest() == ref.digest())
return;
+
+ bodySize += ref->encode().size();
mbody.push_back(ref);
}
+void ReplyBuilder::stream( shared_ptr< NetworkProtocol::OutStream > s )
+{
+ mheader.emplace_back( Header::StreamOpen{ s->id });
+ mstream = move( s );
+}
+
vector<Object> ReplyBuilder::body() const
{
vector<Object> res;
@@ -919,6 +945,11 @@ vector<Object> ReplyBuilder::body() const
return res;
}
+size_t ReplyBuilder::size() const
+{
+ return mheader.size() * Header::itemSize + bodySize;
+}
+
optional<Ref> WaitingRef::check()
{