From 09c1e987f6ddfdefb26b6a5f359363303720d5fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 26 Aug 2023 21:38:36 +0200 Subject: Network: header item as variant of per-header-type structs --- src/network.cpp | 173 +++++++++++++++++++++++------------------------ src/network/protocol.cpp | 130 ++++++++++++----------------------- src/network/protocol.h | 28 +++++--- 3 files changed, 144 insertions(+), 187 deletions(-) diff --git a/src/network.cpp b/src/network.cpp index 8c181cf..7a5a804 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -114,8 +114,7 @@ void Server::addPeer(const string & node, const string & service) const vector header; { shared_lock lock(p->selfMutex); - header.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() }); + header.push_back(NetworkProtocol::Header::AnnounceSelf { p->self.ref()->digest() }); } peer.connection.send(peer.partStorage, header, {}, false); return; @@ -226,8 +225,8 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const { if (auto speer = p->speer.lock()) { NetworkProtocol::Header header({ - { NetworkProtocol::Header::Type::ServiceType, uuid }, - { NetworkProtocol::Header::Type::ServiceRef, ref.digest() }, + NetworkProtocol::Header::ServiceType { uuid }, + NetworkProtocol::Header::ServiceRef { ref.digest() }, }); speer->connection.send(speer->partStorage, header, { obj }, true); return true; @@ -417,7 +416,7 @@ void Server::Priv::doAnnounce() if (lastAnnounce + announceInterval < now) { shared_lock slock(selfMutex); NetworkProtocol::Header header({ - { NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest() } + NetworkProtocol::Header::AnnounceSelf { self.ref()->digest() }, }); vector bytes = header.toObject(pst).encode(); @@ -506,32 +505,29 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head optional serviceType; - for (auto & item : header.items) { - switch (item.type) { - case NetworkProtocol::Header::Type::Acknowledged: { - auto dgst = std::get(item.value); + for (const auto & item : header.items) { + if (const auto * ack = get_if(&item)) { + const auto & dgst = ack->value; if (holds_alternative>(peer.connection.channel()) && std::get>(peer.connection.channel()).ref().digest() == dgst) peer.finalizeChannel(reply, std::get>(peer.connection.channel())->data->channel()); - break; } - case NetworkProtocol::Header::Type::DataRequest: { - auto dgst = std::get(item.value); + else if (const auto * req = get_if(&item)) { + const auto & dgst = req->value; if (holds_alternative>(peer.connection.channel()) || plaintextRefs.find(dgst) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(dgst)) { - reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() }); + reply.header({ NetworkProtocol::Header::DataResponse { ref->digest() } }); reply.body(*ref); } } - break; } - case NetworkProtocol::Header::Type::DataResponse: { - auto dgst = std::get(item.value); - reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + else if (const auto * rsp = get_if(&item)) { + const auto & dgst = rsp->value; + reply.header({ NetworkProtocol::Header::Acknowledged { dgst } }); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), dgst) != @@ -543,16 +539,13 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head } waiting.erase(std::remove_if(waiting.begin(), waiting.end(), [](auto & wref) { return wref.expired(); }), waiting.end()); - break; } - case NetworkProtocol::Header::Type::AnnounceSelf: { - auto dgst = std::get(item.value); - if (dgst == self.ref()->digest()) - break; - - if (holds_alternative(peer.identity)) { - reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest()}); + else if (const auto * ann = get_if(&item)) { + const auto & dgst = ann->value; + if (dgst != self.ref()->digest() && + holds_alternative(peer.identity)) { + reply.header({ NetworkProtocol::Header::AnnounceSelf { self.ref()->digest() }}); shared_ptr wref(new WaitingRef { .storage = peer.tempStorage, @@ -563,13 +556,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head peer.identity = wref; wref->check(reply); } - break; } - case NetworkProtocol::Header::Type::AnnounceUpdate: + else if (const auto * anu = get_if(&item)) { if (holds_alternative(peer.identity)) { - auto dgst = std::get(item.value); - reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + const auto & dgst = anu->value; + reply.header({ NetworkProtocol::Header::Acknowledged { dgst } }); shared_ptr wref(new WaitingRef { .storage = peer.tempStorage, @@ -580,76 +572,81 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head peer.identityUpdates.push_back(wref); wref->check(reply); } - break; + } - case NetworkProtocol::Header::Type::ChannelRequest: { - auto dgst = std::get(item.value); - reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + else if (const auto * req = get_if(&item)) { + const auto & dgst = req->value; + reply.header({ NetworkProtocol::Header::Acknowledged { dgst } }); if (holds_alternative>(peer.connection.channel()) && - std::get>(peer.connection.channel()).ref().digest() < dgst) - break; + std::get>(peer.connection.channel()).ref().digest() < dgst) { + // TODO: reject request with lower priority + } - if (holds_alternative>(peer.connection.channel())) - break; + else if (holds_alternative>(peer.connection.channel())) { + // TODO: reject when we already sent accept + } - shared_ptr wref(new WaitingRef { - .storage = peer.tempStorage, - .ref = peer.partStorage.ref(dgst), - .missing = {}, - }); - waiting.push_back(wref); - peer.connection.channel() = wref; - wref->check(reply); - break; + else { + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = peer.partStorage.ref(dgst), + .missing = {}, + }); + waiting.push_back(wref); + peer.connection.channel() = wref; + wref->check(reply); + } } - case NetworkProtocol::Header::Type::ChannelAccept: { - auto dgst = std::get(item.value); + else if (const auto * acc = get_if(&item)) { + const auto & dgst = acc->value; if (holds_alternative>(peer.connection.channel()) && - std::get>(peer.connection.channel()).ref().digest() < dgst) - break; - - auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); - if (auto r = std::get_if(&cres)) { - auto acc = ChannelAccept::load(*r); - if (holds_alternative(peer.identity) && - acc.isSignedBy(std::get(peer.identity).keyMessage())) { - reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); - peer.finalizeChannel(reply, acc.data->channel()); + std::get>(peer.connection.channel()).ref().digest() < dgst) { + // TODO: reject request with lower priority + } + + else { + auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); + if (auto r = get_if(&cres)) { + auto acc = ChannelAccept::load(*r); + if (holds_alternative(peer.identity) && + acc.isSignedBy(std::get(peer.identity).keyMessage())) { + reply.header({ NetworkProtocol::Header::Acknowledged { dgst } }); + peer.finalizeChannel(reply, acc.data->channel()); + } } } - break; } - case NetworkProtocol::Header::Type::ServiceType: + else if (const auto * stype = get_if(&item)) { if (!serviceType) - serviceType = std::get(item.value); - break; + serviceType = stype->value; + } - case NetworkProtocol::Header::Type::ServiceRef: + else if (const auto * sref = get_if(&item)) { if (!serviceType) for (auto & item : header.items) - if (item.type == NetworkProtocol::Header::Type::ServiceType) { - serviceType = std::get(item.value); + if (const auto * stype = get_if(&item)) { + serviceType = stype->value; break; } - if (!serviceType) - break; - auto dgst = std::get(item.value); - auto pref = peer.partStorage.ref(dgst); - if (pref) - reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + if (serviceType) { + const auto & dgst = sref->value; + auto pref = peer.partStorage.ref(dgst); + if (pref) + reply.header({ NetworkProtocol::Header::Acknowledged { dgst } }); - shared_ptr wref(new WaitingRef { - .storage = peer.tempStorage, - .ref = pref, - .missing = {}, - }); - waiting.push_back(wref); - peer.serviceQueue.emplace_back(*serviceType, wref); - wref->check(reply); + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .missing = {}, + }); + waiting.push_back(wref); + peer.serviceQueue.emplace_back(*serviceType, wref); + wref->check(reply); + } } } } @@ -665,11 +662,9 @@ void Server::Priv::handleLocalHeadChange(const Head & head) vector hitems; for (const auto & r : self.refs()) - hitems.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); + hitems.push_back(NetworkProtocol::Header::AnnounceUpdate { r.digest() }); for (const auto & r : self.updates()) - hitems.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); + hitems.push_back(NetworkProtocol::Header::AnnounceUpdate { r.digest() }); NetworkProtocol::Header header(hitems); @@ -724,7 +719,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) auto req = Channel::generateRequest(tempStorage, server.self, std::get(identity)); connection.channel().emplace>(req); - reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() }); + reply.header({ NetworkProtocol::Header::ChannelRequest { req.ref().digest() } }); reply.body(req.ref()); reply.body(req->data.ref()); reply.body(req->data->key.ref()); @@ -739,7 +734,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) req->isSignedBy(std::get(identity).keyMessage())) { if (auto acc = Channel::acceptRequest(server.self, std::get(identity), req)) { connection.channel().emplace>(*acc); - reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() }); + reply.header({ NetworkProtocol::Header::ChannelAccept { acc->ref().digest() } }); reply.body(acc->ref()); reply.body(acc.value()->data.ref()); reply.body(acc.value()->data->key.ref()); @@ -761,11 +756,9 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr ch) vector hitems; for (const auto & r : server.self.refs()) - reply.header(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); + reply.header(NetworkProtocol::Header::AnnounceUpdate { r.digest() }); for (const auto & r : server.self.updates()) - reply.header(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); + reply.header(NetworkProtocol::Header::AnnounceUpdate { r.digest() }); } void Server::Peer::updateService(ReplyBuilder & reply) @@ -848,7 +841,7 @@ optional WaitingRef::check(ReplyBuilder & reply) return r; for (const auto & d : missing) - reply.header({ NetworkProtocol::Header::Type::DataRequest, d }); + reply.header({ NetworkProtocol::Header::DataRequest { d } }); return nullopt; } diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index f38267f..ede7023 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -9,11 +9,12 @@ #include #include +using std::get_if; using std::holds_alternative; using std::move; using std::nullopt; -using std::runtime_error; using std::scoped_lock; +using std::visit; namespace erebos { @@ -327,21 +328,16 @@ void NetworkProtocol::Connection::trySendOutQueue() /* Header */ /******************************************************************************/ -bool NetworkProtocol::Header::Item::operator==(const Item & other) const +bool operator==(const NetworkProtocol::Header::Item & left, + const NetworkProtocol::Header::Item & right) { - if (type != other.type) + if (left.index() != right.index()) return false; - if (value.index() != other.value.index()) - return false; - - if (holds_alternative(value)) - return std::get(value) == std::get(other.value); - - if (holds_alternative(value)) - return std::get(value) == std::get(other.value); - - throw runtime_error("unhandled network header item type"); + return visit([&](auto && arg) { + using T = std::decay_t; + return arg.value == std::get(right).value; + }, left); } optional NetworkProtocol::Header::load(const PartialRef & ref) @@ -359,58 +355,31 @@ optional NetworkProtocol::Header::load(const PartialObj for (const auto & item : rec->items()) { if (item.name == "ACK") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::Acknowledged, - .value = ref->digest(), - }); + items.emplace_back(Acknowledged { ref->digest() }); } else if (item.name == "REQ") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::DataRequest, - .value = ref->digest(), - }); + items.emplace_back(DataRequest { ref->digest() }); } else if (item.name == "RSP") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::DataResponse, - .value = ref->digest(), - }); + items.emplace_back(DataResponse { ref->digest() }); } else if (item.name == "ANN") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::AnnounceSelf, - .value = ref->digest(), - }); + items.emplace_back(AnnounceSelf { ref->digest() }); } else if (item.name == "ANU") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::AnnounceUpdate, - .value = ref->digest(), - }); + items.emplace_back(AnnounceUpdate { ref->digest() }); } else if (item.name == "CRQ") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ChannelRequest, - .value = ref->digest(), - }); + items.emplace_back(ChannelRequest { ref->digest() }); } else if (item.name == "CAC") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ChannelAccept, - .value = ref->digest(), - }); + items.emplace_back(ChannelAccept { ref->digest() }); } else if (item.name == "STP") { if (auto val = item.asUUID()) - items.emplace_back(Item { - .type = Type::ServiceType, - .value = *val, - }); + items.emplace_back(ServiceType { *val }); } else if (item.name == "SRF") { if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ServiceRef, - .value = ref->digest(), - }); + items.emplace_back(ServiceRef { ref->digest() }); } } @@ -422,43 +391,32 @@ PartialObject NetworkProtocol::Header::toObject(const PartialStorage & st) const vector ritems; for (const auto & item : items) { - switch (item.type) { - case Type::Acknowledged: - ritems.emplace_back("ACK", st.ref(std::get(item.value))); - break; - - case Type::DataRequest: - ritems.emplace_back("REQ", st.ref(std::get(item.value))); - break; - - case Type::DataResponse: - ritems.emplace_back("RSP", st.ref(std::get(item.value))); - break; - - case Type::AnnounceSelf: - ritems.emplace_back("ANN", st.ref(std::get(item.value))); - break; - - case Type::AnnounceUpdate: - ritems.emplace_back("ANU", st.ref(std::get(item.value))); - break; - - case Type::ChannelRequest: - ritems.emplace_back("CRQ", st.ref(std::get(item.value))); - break; - - case Type::ChannelAccept: - ritems.emplace_back("CAC", st.ref(std::get(item.value))); - break; - - case Type::ServiceType: - ritems.emplace_back("STP", std::get(item.value)); - break; - - case Type::ServiceRef: - ritems.emplace_back("SRF", st.ref(std::get(item.value))); - break; - } + if (const auto * ptr = get_if(&item)) + ritems.emplace_back("ACK", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("REQ", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("RSP", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("ANN", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("ANU", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("CRQ", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("CAC", st.ref(ptr->value)); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("STP", ptr->value); + + else if (const auto * ptr = get_if(&item)) + ritems.emplace_back("SRF", st.ref(ptr->value)); } return PartialObject(PartialRecord(std::move(ritems))); diff --git a/src/network/protocol.h b/src/network/protocol.h index c5803ce..df29c05 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -106,7 +106,17 @@ struct NetworkProtocol::ConnectionReadReady { Connection::Id id; }; struct NetworkProtocol::Header { - enum class Type { + struct Acknowledged { Digest value; }; + struct DataRequest { Digest value; }; + struct DataResponse { Digest value; }; + struct AnnounceSelf { Digest value; }; + struct AnnounceUpdate { Digest value; }; + struct ChannelRequest { Digest value; }; + struct ChannelAccept { Digest value; }; + struct ServiceType { UUID value; }; + struct ServiceRef { Digest value; }; + + using Item = variant< Acknowledged, DataRequest, DataResponse, @@ -115,16 +125,7 @@ struct NetworkProtocol::Header ChannelRequest, ChannelAccept, ServiceType, - ServiceRef, - }; - - struct Item { - const Type type; - const variant value; - - bool operator==(const Item &) const; - bool operator!=(const Item & other) const { return !(*this == other); } - }; + ServiceRef>; Header(const vector & items): items(items) {} static optional
load(const PartialRef &); @@ -134,6 +135,11 @@ struct NetworkProtocol::Header const vector items; }; +bool operator==(const NetworkProtocol::Header::Item &, const NetworkProtocol::Header::Item &); +inline bool operator!=(const NetworkProtocol::Header::Item & left, + const NetworkProtocol::Header::Item & right) +{ return not (left == right); } + class ReplyBuilder { public: -- cgit v1.2.3