diff options
-rw-r--r-- | src/network.cpp | 80 | ||||
-rw-r--r-- | src/network.h | 28 |
2 files changed, 72 insertions, 36 deletions
diff --git a/src/network.cpp b/src/network.cpp index f3a3651..9a9dc41 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -209,8 +209,8 @@ void Server::Priv::doListen() peer.updateIdentity(reply); peer.updateChannel(reply); - if (!reply.header.empty()) - peer.send(TransportHeader(reply.header), reply.body); + if (!reply.header().empty()) + peer.send(TransportHeader(reply.header()), reply.body()); } } else { std::cerr << "invalid packet\n"; @@ -292,8 +292,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - reply.header.push_back({ TransportHeader::Type::DataResponse, *ref }); - reply.body.push_back(**ref); + reply.header({ TransportHeader::Type::DataResponse, *ref }); + reply.body(*ref); } } break; @@ -301,12 +301,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea case TransportHeader::Type::DataResponse: if (auto pref = std::get<PartialRef>(item.value)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != wref->missing.end()) { - if (wref->check(&reply.header)) + if (wref->check(reply)) pwref.reset(); } } @@ -322,7 +322,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; if (holds_alternative<monostate>(peer.identity)) - reply.header.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, @@ -332,7 +332,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea }); waiting.push_back(wref); peer.identity = wref; - wref->check(&reply.header); + wref->check(reply); break; } @@ -341,7 +341,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea case TransportHeader::Type::ChannelRequest: if (auto pref = std::get<PartialRef>(item.value)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && std::get<Stored<ChannelRequest>>(peer.channel).ref.digest() < pref.digest()) @@ -358,7 +358,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea }); waiting.push_back(wref); peer.channel = wref; - wref->check(&reply.header); + wref->check(reply); } break; @@ -371,7 +371,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea auto cres = peer.tempStorage.copy(pref); if (auto r = std::get_if<Ref>(&cres)) { if (auto acc = ChannelAccept::load(*r)) { - reply.header.push_back({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ TransportHeader::Type::Acknowledged, pref }); peer.channel.emplace<Stored<Channel>>(acc->data->channel()); } } @@ -411,7 +411,7 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o void Server::Peer::updateIdentity(ReplyBuilder & reply) { if (holds_alternative<shared_ptr<WaitingRef>>(identity)) - if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(&reply.header)) + if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply)) if (auto id = Identity::load(*ref)) { identity.emplace<Identity>(*id); if (lpeer) @@ -428,25 +428,25 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) auto req = Channel::generateRequest(tempStorage, server.self, std::get<Identity>(identity)); channel.emplace<Stored<ChannelRequest>>(req); - reply.header.push_back({ TransportHeader::Type::ChannelRequest, req.ref }); - reply.body.push_back(*req.ref); - reply.body.push_back(*req->data.ref); - reply.body.push_back(*req->data->key.ref); + reply.header({ TransportHeader::Type::ChannelRequest, req.ref }); + reply.body(req.ref); + reply.body(req->data.ref); + reply.body(req->data->key.ref); for (const auto & sig : req->sigs) - reply.body.push_back(*sig.ref); + reply.body(sig.ref); } if (holds_alternative<shared_ptr<WaitingRef>>(channel)) { - if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(&reply.header)) { + if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) { if (auto req = Stored<ChannelRequest>::load(*ref)) { if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), *req)) { channel.emplace<Stored<ChannelAccept>>(*acc); - reply.header.push_back({ TransportHeader::Type::ChannelAccept, acc->ref }); - reply.body.push_back(*acc->ref); - reply.body.push_back(*acc.value()->data.ref); - reply.body.push_back(*acc.value()->data->key.ref); + reply.header({ TransportHeader::Type::ChannelAccept, acc->ref }); + reply.body(acc->ref); + reply.body(acc.value()->data.ref); + reply.body(acc.value()->data->key.ref); for (const auto & sig : acc.value()->sigs) - reply.body.push_back(*sig.ref); + reply.body(sig.ref); } else { channel = monostate(); } @@ -457,7 +457,34 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } } -optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request) + +void ReplyBuilder::header(TransportHeader::Item && item) +{ + for (const auto & x : mheader) + if (x.type == item.type && x.value == item.value) + return; + mheader.emplace_back(std::move(item)); +} + +void ReplyBuilder::body(const Ref & ref) +{ + for (const auto & x : mbody) + if (x.digest() == ref.digest()) + return; + mbody.push_back(ref); +} + +vector<Object> ReplyBuilder::body() const +{ + vector<Object> res; + res.reserve(mbody.size()); + for (const Ref & ref : mbody) + res.push_back(*ref); + return res; +} + + +optional<Ref> WaitingRef::check(ReplyBuilder & reply) { if (auto r = storage.ref(ref.digest())) return *r; @@ -467,9 +494,8 @@ optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request) return *r; missing = std::get<vector<Digest>>(res); - if (request) - for (const auto & d : missing) - request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + for (const auto & d : missing) + reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); return nullopt; } diff --git a/src/network.h b/src/network.h index 07b5363..13bb031 100644 --- a/src/network.h +++ b/src/network.h @@ -30,6 +30,8 @@ using chrono::steady_clock; namespace erebos { +class ReplyBuilder; + struct Server::Peer { Peer(const Peer &) = delete; @@ -54,8 +56,8 @@ struct Server::Peer shared_ptr<erebos::Peer::Priv> lpeer = nullptr; void send(const struct TransportHeader &, const vector<Object> &) const; - void updateIdentity(struct ReplyBuilder &); - void updateChannel(struct ReplyBuilder &); + void updateIdentity(ReplyBuilder &); + void updateChannel(ReplyBuilder &); }; struct Peer::Priv : enable_shared_from_this<Peer::Priv> @@ -103,6 +105,20 @@ struct TransportHeader const vector<Item> items; }; +class ReplyBuilder +{ +public: + void header(TransportHeader::Item &&); + void body(const Ref &); + + const vector<TransportHeader::Item> & header() const { return mheader; } + vector<Object> body() const; + +private: + vector<TransportHeader::Item> mheader; + vector<Ref> mbody; +}; + struct WaitingRef { const Storage storage; @@ -110,13 +126,7 @@ struct WaitingRef const Server::Peer & peer; vector<Digest> missing; - optional<Ref> check(vector<TransportHeader::Item> * request = nullptr); -}; - -struct ReplyBuilder -{ - vector<TransportHeader::Item> header; - vector<Object> body; + optional<Ref> check(ReplyBuilder &); }; struct Server::Priv |