summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-09-29 09:38:40 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-11-16 10:12:49 +0100
commit116a34391a480d5b55041101b544526ee0dd8e3c (patch)
tree47d5db63447e273ccc1c2186dc437621fe7593cc
parentb3cd90262f521d9c207b395f175cd3b26d2f4363 (diff)
Network: avoid duplicated data requests
-rw-r--r--src/network.cpp47
-rw-r--r--src/network.h1
-rw-r--r--src/network/protocol.h2
3 files changed, 35 insertions, 15 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 409b829..26a07e3 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -445,9 +445,17 @@ void Server::Priv::doListen()
}
peer->updateService( reply, readyServices );
- if (!reply.header().empty())
+ if( not reply.header().empty() ) {
+ for( const auto & item : reply.header() ) {
+ if( const auto * req = get_if< NetworkProtocol::Header::DataRequest >( &item )) {
+ const auto & dgst = req->value;
+ peer->requestedData.push_back( dgst );
+ }
+ }
+
peer->connection.send(peer->partStorage,
NetworkProtocol::Header(reply.header()), reply.body(), false);
+ }
peer->connection.trySendOutQueue();
}
@@ -599,11 +607,14 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
reply.header({ Header::Acknowledged { dgst } });
if (peer.partStorage.loadObject( dgst )) {
+ peer.requestedData.erase(
+ std::remove( peer.requestedData.begin(), peer.requestedData.end(), dgst ),
+ peer.requestedData.end() );
for (auto & pwref : waiting) {
if (auto wref = pwref.lock()) {
if (std::find(wref->missing.begin(), wref->missing.end(), dgst) !=
wref->missing.end()) {
- if (wref->check(reply))
+ if( wref->check( reply, peer.requestedData ))
pwref.reset();
}
}
@@ -633,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
});
waiting.push_back(wref);
peer.identity = wref;
- wref->check(reply);
+ wref->check( reply, peer.requestedData );
}
}
@@ -648,7 +659,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
});
waiting.push_back(wref);
peer.identityUpdates.push_back(wref);
- wref->check(reply);
+ wref->check( reply, peer.requestedData );
}
}
@@ -673,7 +684,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
});
waiting.push_back(wref);
peer.connection.channel() = wref;
- wref->check(reply);
+ wref->check( reply, peer.requestedData );
}
}
@@ -721,7 +732,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
});
waiting.push_back(wref);
peer.serviceQueue.emplace_back(*serviceType, wref);
- wref->check(reply);
+ wref->check( reply, peer.requestedData );
}
}
}
@@ -797,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
}
if (holds_alternative<shared_ptr<WaitingRef>>(connection.channel())) {
- if (auto ref = std::get<shared_ptr<WaitingRef>>(connection.channel())->check(reply)) {
+ if( auto ref = std::get< shared_ptr< WaitingRef >>( connection.channel())->check( reply, requestedData )) {
auto req = Stored<ChannelRequest>::load(*ref);
if (holds_alternative<Identity>(identity) &&
req->isSignedBy(std::get<Identity>(identity).keyMessage())) {
@@ -834,7 +845,7 @@ void Server::Peer::updateService(ReplyBuilder & reply, vector<tuple<shared_ptr<e
{
decltype(serviceQueue) next;
for (auto & x : serviceQueue) {
- if (auto ref = std::get<1>(x)->check(reply)) {
+ if( auto ref = std::get<1>(x)->check( reply, requestedData )) {
if (lpeer) {
for (auto & svc : server.services) {
if (svc->uuid() == std::get<UUID>(x)) {
@@ -857,15 +868,20 @@ void Server::Peer::checkDataResponseStreams( ReplyBuilder & reply )
auto objects = PartialObject::decodeMany( partStorage, s->readAll() );
vector< PartialRef > refs;
refs.reserve( objects.size() );
- for( const auto & obj : objects )
- refs.push_back( partStorage.storeObject( obj ));
+ for( const auto & obj : objects ) {
+ auto ref = partStorage.storeObject( obj );
+ refs.push_back( ref );
+ requestedData.erase(
+ std::remove( requestedData.begin(), requestedData.end(), ref.digest() ),
+ requestedData.end() );
+ }
for( auto & pwref : server.waiting ) {
if (auto wref = pwref.lock()) {
for( const auto & ref : refs ) {
if( std::find( wref->missing.begin(), wref->missing.end(), ref.digest() ) !=
wref->missing.end() ) {
- if( wref->check( reply ) )
+ if( wref->check( reply, requestedData ) )
pwref.reset();
}
}
@@ -917,13 +933,16 @@ optional<Ref> WaitingRef::check()
return nullopt;
}
-optional<Ref> WaitingRef::check(ReplyBuilder & reply)
+optional<Ref> WaitingRef::check( ReplyBuilder & reply, const vector< Digest > & alreadyRequested)
{
if (auto r = check())
return r;
- for (const auto & d : missing)
- reply.header({ NetworkProtocol::Header::DataRequest { d } });
+ for( const auto & d : missing ) {
+ if( std::find( alreadyRequested.begin(), alreadyRequested.end(), d ) ==
+ alreadyRequested.end() )
+ reply.header({ NetworkProtocol::Header::DataRequest { d } });
+ }
return nullopt;
}
diff --git a/src/network.h b/src/network.h
index 12013fe..1fc2c87 100644
--- a/src/network.h
+++ b/src/network.h
@@ -55,6 +55,7 @@ struct Server::Peer
vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {};
vector< shared_ptr< NetworkProtocol::InStream >> dataResponseStreams {};
+ vector< Digest > requestedData {};
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
diff --git a/src/network/protocol.h b/src/network/protocol.h
index 2592c9f..f925af4 100644
--- a/src/network/protocol.h
+++ b/src/network/protocol.h
@@ -282,7 +282,7 @@ struct WaitingRef
vector<Digest> missing;
optional<Ref> check();
- optional<Ref> check(ReplyBuilder &);
+ optional<Ref> check( ReplyBuilder &, const vector< Digest > &);
};
}