From 116a34391a480d5b55041101b544526ee0dd8e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 29 Sep 2024 09:38:40 +0200 Subject: Network: avoid duplicated data requests --- src/network.cpp | 47 +++++++++++++++++++++++++++++++++-------------- src/network.h | 1 + src/network/protocol.h | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/network.cpp b/src/network.cpp index 409b829..26a07e3 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -445,9 +445,17 @@ void Server::Priv::doListen() } peer->updateService( reply, readyServices ); - if (!reply.header().empty()) + if( not reply.header().empty() ) { + for( const auto & item : reply.header() ) { + if( const auto * req = get_if< NetworkProtocol::Header::DataRequest >( &item )) { + const auto & dgst = req->value; + peer->requestedData.push_back( dgst ); + } + } + peer->connection.send(peer->partStorage, NetworkProtocol::Header(reply.header()), reply.body(), false); + } peer->connection.trySendOutQueue(); } @@ -599,11 +607,14 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head reply.header({ Header::Acknowledged { dgst } }); if (peer.partStorage.loadObject( dgst )) { + peer.requestedData.erase( + std::remove( peer.requestedData.begin(), peer.requestedData.end(), dgst ), + peer.requestedData.end() ); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), dgst) != wref->missing.end()) { - if (wref->check(reply)) + if( wref->check( reply, peer.requestedData )) pwref.reset(); } } @@ -633,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.identity = wref; - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -648,7 +659,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.identityUpdates.push_back(wref); - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -673,7 +684,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.connection.channel() = wref; - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -721,7 +732,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.serviceQueue.emplace_back(*serviceType, wref); - wref->check(reply); + wref->check( reply, peer.requestedData ); } } } @@ -797,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } if (holds_alternative>(connection.channel())) { - if (auto ref = std::get>(connection.channel())->check(reply)) { + if( auto ref = std::get< shared_ptr< WaitingRef >>( connection.channel())->check( reply, requestedData )) { auto req = Stored::load(*ref); if (holds_alternative(identity) && req->isSignedBy(std::get(identity).keyMessage())) { @@ -834,7 +845,7 @@ void Server::Peer::updateService(ReplyBuilder & reply, vector(x)->check(reply)) { + if( auto ref = std::get<1>(x)->check( reply, requestedData )) { if (lpeer) { for (auto & svc : server.services) { if (svc->uuid() == std::get(x)) { @@ -857,15 +868,20 @@ void Server::Peer::checkDataResponseStreams( ReplyBuilder & reply ) auto objects = PartialObject::decodeMany( partStorage, s->readAll() ); vector< PartialRef > refs; refs.reserve( objects.size() ); - for( const auto & obj : objects ) - refs.push_back( partStorage.storeObject( obj )); + for( const auto & obj : objects ) { + auto ref = partStorage.storeObject( obj ); + refs.push_back( ref ); + requestedData.erase( + std::remove( requestedData.begin(), requestedData.end(), ref.digest() ), + requestedData.end() ); + } for( auto & pwref : server.waiting ) { if (auto wref = pwref.lock()) { for( const auto & ref : refs ) { if( std::find( wref->missing.begin(), wref->missing.end(), ref.digest() ) != wref->missing.end() ) { - if( wref->check( reply ) ) + if( wref->check( reply, requestedData ) ) pwref.reset(); } } @@ -917,13 +933,16 @@ optional WaitingRef::check() return nullopt; } -optional WaitingRef::check(ReplyBuilder & reply) +optional WaitingRef::check( ReplyBuilder & reply, const vector< Digest > & alreadyRequested) { if (auto r = check()) return r; - for (const auto & d : missing) - reply.header({ NetworkProtocol::Header::DataRequest { d } }); + for( const auto & d : missing ) { + if( std::find( alreadyRequested.begin(), alreadyRequested.end(), d ) == + alreadyRequested.end() ) + reply.header({ NetworkProtocol::Header::DataRequest { d } }); + } return nullopt; } diff --git a/src/network.h b/src/network.h index 12013fe..1fc2c87 100644 --- a/src/network.h +++ b/src/network.h @@ -55,6 +55,7 @@ struct Server::Peer vector>> serviceQueue {}; vector< shared_ptr< NetworkProtocol::InStream >> dataResponseStreams {}; + vector< Digest > requestedData {}; shared_ptr lpeer = nullptr; diff --git a/src/network/protocol.h b/src/network/protocol.h index 2592c9f..f925af4 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -282,7 +282,7 @@ struct WaitingRef vector missing; optional check(); - optional check(ReplyBuilder &); + optional check( ReplyBuilder &, const vector< Digest > &); }; } -- cgit v1.2.3