From 76d6f638df485d179899fa740b9bb53ee55ba7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Fri, 28 Feb 2020 21:27:48 +0100 Subject: Network: avoid duplicit items in generated packet --- src/network.cpp | 80 ++++++++++++++++++++++++++++++++++++++------------------- src/network.h | 28 +++++++++++++------- 2 files changed, 72 insertions(+), 36 deletions(-) (limited to 'src') diff --git a/src/network.cpp b/src/network.cpp index f3a3651..9a9dc41 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -209,8 +209,8 @@ void Server::Priv::doListen() peer.updateIdentity(reply); peer.updateChannel(reply); - if (!reply.header.empty()) - peer.send(TransportHeader(reply.header), reply.body); + if (!reply.header().empty()) + peer.send(TransportHeader(reply.header()), reply.body()); } } else { std::cerr << "invalid packet\n"; @@ -292,8 +292,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - reply.header.push_back({ TransportHeader::Type::DataResponse, *ref }); - reply.body.push_back(**ref); + reply.header({ TransportHeader::Type::DataResponse, *ref }); + reply.body(*ref); } } break; @@ -301,12 +301,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea case TransportHeader::Type::DataResponse: if (auto pref = std::get(item.value)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != wref->missing.end()) { - if (wref->check(&reply.header)) + if (wref->check(reply)) pwref.reset(); } } @@ -322,7 +322,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; if (holds_alternative(peer.identity)) - reply.header.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); shared_ptr wref(new WaitingRef { .storage = peer.tempStorage, @@ -332,7 +332,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea }); waiting.push_back(wref); peer.identity = wref; - wref->check(&reply.header); + wref->check(reply); break; } @@ -341,7 +341,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea case TransportHeader::Type::ChannelRequest: if (auto pref = std::get(item.value)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); if (holds_alternative>(peer.channel) && std::get>(peer.channel).ref.digest() < pref.digest()) @@ -358,7 +358,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea }); waiting.push_back(wref); peer.channel = wref; - wref->check(&reply.header); + wref->check(reply); } break; @@ -371,7 +371,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea auto cres = peer.tempStorage.copy(pref); if (auto r = std::get_if(&cres)) { if (auto acc = ChannelAccept::load(*r)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); peer.channel.emplace>(acc->data->channel()); } } @@ -411,7 +411,7 @@ void Server::Peer::send(const TransportHeader & header, const vector & o void Server::Peer::updateIdentity(ReplyBuilder & reply) { if (holds_alternative>(identity)) - if (auto ref = std::get>(identity)->check(&reply.header)) + if (auto ref = std::get>(identity)->check(reply)) if (auto id = Identity::load(*ref)) { identity.emplace(*id); if (lpeer) @@ -428,25 +428,25 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) auto req = Channel::generateRequest(tempStorage, server.self, std::get(identity)); channel.emplace>(req); - reply.header.push_back({ TransportHeader::Type::ChannelRequest, req.ref }); - reply.body.push_back(*req.ref); - reply.body.push_back(*req->data.ref); - reply.body.push_back(*req->data->key.ref); + reply.header({ TransportHeader::Type::ChannelRequest, req.ref }); + reply.body(req.ref); + reply.body(req->data.ref); + reply.body(req->data->key.ref); for (const auto & sig : req->sigs) - reply.body.push_back(*sig.ref); + reply.body(sig.ref); } if (holds_alternative>(channel)) { - if (auto ref = std::get>(channel)->check(&reply.header)) { + if (auto ref = std::get>(channel)->check(reply)) { if (auto req = Stored::load(*ref)) { if (auto acc = Channel::acceptRequest(server.self, std::get(identity), *req)) { channel.emplace>(*acc); - reply.header.push_back({ TransportHeader::Type::ChannelAccept, acc->ref }); - reply.body.push_back(*acc->ref); - reply.body.push_back(*acc.value()->data.ref); - reply.body.push_back(*acc.value()->data->key.ref); + reply.header({ TransportHeader::Type::ChannelAccept, acc->ref }); + reply.body(acc->ref); + reply.body(acc.value()->data.ref); + reply.body(acc.value()->data->key.ref); for (const auto & sig : acc.value()->sigs) - reply.body.push_back(*sig.ref); + reply.body(sig.ref); } else { channel = monostate(); } @@ -457,7 +457,34 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } } -optional WaitingRef::check(vector * request) + +void ReplyBuilder::header(TransportHeader::Item && item) +{ + for (const auto & x : mheader) + if (x.type == item.type && x.value == item.value) + return; + mheader.emplace_back(std::move(item)); +} + +void ReplyBuilder::body(const Ref & ref) +{ + for (const auto & x : mbody) + if (x.digest() == ref.digest()) + return; + mbody.push_back(ref); +} + +vector ReplyBuilder::body() const +{ + vector res; + res.reserve(mbody.size()); + for (const Ref & ref : mbody) + res.push_back(*ref); + return res; +} + + +optional WaitingRef::check(ReplyBuilder & reply) { if (auto r = storage.ref(ref.digest())) return *r; @@ -467,9 +494,8 @@ optional WaitingRef::check(vector * request) return *r; missing = std::get>(res); - if (request) - for (const auto & d : missing) - request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + for (const auto & d : missing) + reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); return nullopt; } diff --git a/src/network.h b/src/network.h index 07b5363..13bb031 100644 --- a/src/network.h +++ b/src/network.h @@ -30,6 +30,8 @@ using chrono::steady_clock; namespace erebos { +class ReplyBuilder; + struct Server::Peer { Peer(const Peer &) = delete; @@ -54,8 +56,8 @@ struct Server::Peer shared_ptr lpeer = nullptr; void send(const struct TransportHeader &, const vector &) const; - void updateIdentity(struct ReplyBuilder &); - void updateChannel(struct ReplyBuilder &); + void updateIdentity(ReplyBuilder &); + void updateChannel(ReplyBuilder &); }; struct Peer::Priv : enable_shared_from_this @@ -103,6 +105,20 @@ struct TransportHeader const vector items; }; +class ReplyBuilder +{ +public: + void header(TransportHeader::Item &&); + void body(const Ref &); + + const vector & header() const { return mheader; } + vector body() const; + +private: + vector mheader; + vector mbody; +}; + struct WaitingRef { const Storage storage; @@ -110,13 +126,7 @@ struct WaitingRef const Server::Peer & peer; vector missing; - optional check(vector * request = nullptr); -}; - -struct ReplyBuilder -{ - vector header; - vector body; + optional check(ReplyBuilder &); }; struct Server::Priv -- cgit v1.2.3