diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-09-29 09:38:40 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-11-16 10:12:49 +0100 |
commit | 116a34391a480d5b55041101b544526ee0dd8e3c (patch) | |
tree | 47d5db63447e273ccc1c2186dc437621fe7593cc | |
parent | b3cd90262f521d9c207b395f175cd3b26d2f4363 (diff) |
Network: avoid duplicated data requests
-rw-r--r-- | src/network.cpp | 47 | ||||
-rw-r--r-- | src/network.h | 1 | ||||
-rw-r--r-- | src/network/protocol.h | 2 |
3 files changed, 35 insertions, 15 deletions
diff --git a/src/network.cpp b/src/network.cpp index 409b829..26a07e3 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -445,9 +445,17 @@ void Server::Priv::doListen() } peer->updateService( reply, readyServices ); - if (!reply.header().empty()) + if( not reply.header().empty() ) { + for( const auto & item : reply.header() ) { + if( const auto * req = get_if< NetworkProtocol::Header::DataRequest >( &item )) { + const auto & dgst = req->value; + peer->requestedData.push_back( dgst ); + } + } + peer->connection.send(peer->partStorage, NetworkProtocol::Header(reply.header()), reply.body(), false); + } peer->connection.trySendOutQueue(); } @@ -599,11 +607,14 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head reply.header({ Header::Acknowledged { dgst } }); if (peer.partStorage.loadObject( dgst )) { + peer.requestedData.erase( + std::remove( peer.requestedData.begin(), peer.requestedData.end(), dgst ), + peer.requestedData.end() ); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), dgst) != wref->missing.end()) { - if (wref->check(reply)) + if( wref->check( reply, peer.requestedData )) pwref.reset(); } } @@ -633,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.identity = wref; - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -648,7 +659,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.identityUpdates.push_back(wref); - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -673,7 +684,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.connection.channel() = wref; - wref->check(reply); + wref->check( reply, peer.requestedData ); } } @@ -721,7 +732,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head }); waiting.push_back(wref); peer.serviceQueue.emplace_back(*serviceType, wref); - wref->check(reply); + wref->check( reply, peer.requestedData ); } } } @@ -797,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } if (holds_alternative<shared_ptr<WaitingRef>>(connection.channel())) { - if (auto ref = std::get<shared_ptr<WaitingRef>>(connection.channel())->check(reply)) { + if( auto ref = std::get< shared_ptr< WaitingRef >>( connection.channel())->check( reply, requestedData )) { auto req = Stored<ChannelRequest>::load(*ref); if (holds_alternative<Identity>(identity) && req->isSignedBy(std::get<Identity>(identity).keyMessage())) { @@ -834,7 +845,7 @@ void Server::Peer::updateService(ReplyBuilder & reply, vector<tuple<shared_ptr<e { decltype(serviceQueue) next; for (auto & x : serviceQueue) { - if (auto ref = std::get<1>(x)->check(reply)) { + if( auto ref = std::get<1>(x)->check( reply, requestedData )) { if (lpeer) { for (auto & svc : server.services) { if (svc->uuid() == std::get<UUID>(x)) { @@ -857,15 +868,20 @@ void Server::Peer::checkDataResponseStreams( ReplyBuilder & reply ) auto objects = PartialObject::decodeMany( partStorage, s->readAll() ); vector< PartialRef > refs; refs.reserve( objects.size() ); - for( const auto & obj : objects ) - refs.push_back( partStorage.storeObject( obj )); + for( const auto & obj : objects ) { + auto ref = partStorage.storeObject( obj ); + refs.push_back( ref ); + requestedData.erase( + std::remove( requestedData.begin(), requestedData.end(), ref.digest() ), + requestedData.end() ); + } for( auto & pwref : server.waiting ) { if (auto wref = pwref.lock()) { for( const auto & ref : refs ) { if( std::find( wref->missing.begin(), wref->missing.end(), ref.digest() ) != wref->missing.end() ) { - if( wref->check( reply ) ) + if( wref->check( reply, requestedData ) ) pwref.reset(); } } @@ -917,13 +933,16 @@ optional<Ref> WaitingRef::check() return nullopt; } -optional<Ref> WaitingRef::check(ReplyBuilder & reply) +optional<Ref> WaitingRef::check( ReplyBuilder & reply, const vector< Digest > & alreadyRequested) { if (auto r = check()) return r; - for (const auto & d : missing) - reply.header({ NetworkProtocol::Header::DataRequest { d } }); + for( const auto & d : missing ) { + if( std::find( alreadyRequested.begin(), alreadyRequested.end(), d ) == + alreadyRequested.end() ) + reply.header({ NetworkProtocol::Header::DataRequest { d } }); + } return nullopt; } diff --git a/src/network.h b/src/network.h index 12013fe..1fc2c87 100644 --- a/src/network.h +++ b/src/network.h @@ -55,6 +55,7 @@ struct Server::Peer vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {}; vector< shared_ptr< NetworkProtocol::InStream >> dataResponseStreams {}; + vector< Digest > requestedData {}; shared_ptr<erebos::Peer::Priv> lpeer = nullptr; diff --git a/src/network/protocol.h b/src/network/protocol.h index 2592c9f..f925af4 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -282,7 +282,7 @@ struct WaitingRef vector<Digest> missing; optional<Ref> check(); - optional<Ref> check(ReplyBuilder &); + optional<Ref> check( ReplyBuilder &, const vector< Digest > &); }; } |