diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-16 20:53:58 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-16 21:49:39 +0200 |
commit | b09e73f0abcc386719a2235cc3ae61fb1cbfc5ca (patch) | |
tree | 462085ac8fb592f4a2c07446058a7e761b51dfac /src/network.cpp | |
parent | 2ed8103ff1c0fca7372b3c3888f590ba41c525e6 (diff) |
Move network header definitions to protocol module
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 229 |
1 files changed, 45 insertions, 184 deletions
diff --git a/src/network.cpp b/src/network.cpp index 786e752..ef06d6e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -111,11 +111,11 @@ void Server::addPeer(const string & node, const string & service) const if (rp->ai_family == AF_INET6) { Peer & peer = p->getPeer(*(sockaddr_in6 *)rp->ai_addr); - vector<TransportHeader::Item> header; + vector<NetworkProtocol::Header::Item> header; { shared_lock lock(p->selfMutex); - header.push_back(TransportHeader::Item { - TransportHeader::Type::AnnounceSelf, *p->self.ref() }); + header.push_back(NetworkProtocol::Header::Item { + NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() }); } peer.send(header, {}, false); return; @@ -232,9 +232,9 @@ bool Peer::send(UUID uuid, const Object & obj) const bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const { if (auto speer = p->speer.lock()) { - TransportHeader header({ - { TransportHeader::Type::ServiceType, uuid }, - { TransportHeader::Type::ServiceRef, ref }, + NetworkProtocol::Header header({ + { NetworkProtocol::Header::Type::ServiceType, uuid }, + { NetworkProtocol::Header::Type::ServiceRef, ref }, }); speer->send(header, { obj }, true); return true; @@ -411,7 +411,7 @@ void Server::Priv::doListen() if (auto dec = PartialObject::decodePrefix(peer->partStorage, current->begin(), current->end())) { - if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { + if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) { auto pos = std::get<1>(*dec); while (auto cdec = PartialObject::decodePrefix(peer->partStorage, pos, current->end())) { @@ -430,7 +430,7 @@ void Server::Priv::doListen() peer->updateService(reply); if (!reply.header().empty()) - peer->send(TransportHeader(reply.header()), reply.body(), false); + peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false); peer->trySendOutQueue(); } @@ -450,8 +450,8 @@ void Server::Priv::doAnnounce() if (lastAnnounce + announceInterval < now) { shared_lock slock(selfMutex); - TransportHeader header({ - { TransportHeader::Type::AnnounceSelf, *self.ref() } + NetworkProtocol::Header header({ + { NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() } }); vector<uint8_t> bytes = header.toObject().encode(); @@ -534,7 +534,7 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn) return *peer; } -void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & header, ReplyBuilder & reply) +void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Header & header, ReplyBuilder & reply) { unordered_set<Digest> plaintextRefs; for (const auto & obj : collectStoredObjects(Stored<Object>::load(*self.ref()))) @@ -544,7 +544,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea for (auto & item : header.items) { switch (item.type) { - case TransportHeader::Type::Acknowledged: + case NetworkProtocol::Header::Type::Acknowledged: if (auto pref = std::get<PartialRef>(item.value)) { if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest()) @@ -553,22 +553,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea } break; - case TransportHeader::Type::DataRequest: { + case NetworkProtocol::Header::Type::DataRequest: { auto pref = std::get<PartialRef>(item.value); if (holds_alternative<unique_ptr<Channel>>(peer.channel) || plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { - TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - reply.header({ TransportHeader::Type::DataResponse, *ref }); + NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref }; + reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref }); reply.body(*ref); } } break; } - case TransportHeader::Type::DataResponse: + case NetworkProtocol::Header::Type::DataResponse: if (auto pref = std::get<PartialRef>(item.value)) { - reply.header({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); for (auto & pwref : waiting) { if (auto wref = pwref.lock()) { if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != @@ -583,13 +583,13 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea } break; - case TransportHeader::Type::AnnounceSelf: { + case NetworkProtocol::Header::Type::AnnounceSelf: { auto pref = std::get<PartialRef>(item.value); if (pref.digest() == self.ref()->digest()) break; if (holds_alternative<monostate>(peer.identity)) { - reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()}); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, @@ -604,10 +604,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; } - case TransportHeader::Type::AnnounceUpdate: + case NetworkProtocol::Header::Type::AnnounceUpdate: if (holds_alternative<Identity>(peer.identity)) { auto pref = std::get<PartialRef>(item.value); - reply.header({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, @@ -621,9 +621,9 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea } break; - case TransportHeader::Type::ChannelRequest: + case NetworkProtocol::Header::Type::ChannelRequest: if (auto pref = std::get<PartialRef>(item.value)) { - reply.header({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest()) @@ -644,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea } break; - case TransportHeader::Type::ChannelAccept: + case NetworkProtocol::Header::Type::ChannelAccept: if (auto pref = std::get<PartialRef>(item.value)) { if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest()) @@ -655,22 +655,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea auto acc = ChannelAccept::load(*r); if (holds_alternative<Identity>(peer.identity) && acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { - reply.header({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); peer.finalizeChannel(reply, acc.data->channel()); } } } break; - case TransportHeader::Type::ServiceType: + case NetworkProtocol::Header::Type::ServiceType: if (!serviceType) serviceType = std::get<UUID>(item.value); break; - case TransportHeader::Type::ServiceRef: + case NetworkProtocol::Header::Type::ServiceRef: if (!serviceType) for (auto & item : header.items) - if (item.type == TransportHeader::Type::ServiceType) { + if (item.type == NetworkProtocol::Header::Type::ServiceType) { serviceType = std::get<UUID>(item.value); break; } @@ -679,7 +679,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea auto pref = std::get<PartialRef>(item.value); if (pref) - reply.header({ TransportHeader::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, @@ -703,15 +703,15 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) if (*id != self) { self = *id; - vector<TransportHeader::Item> hitems; + vector<NetworkProtocol::Header::Item> hitems; for (const auto & r : self.refs()) - hitems.push_back(TransportHeader::Item { - TransportHeader::Type::AnnounceUpdate, r }); + hitems.push_back(NetworkProtocol::Header::Item { + NetworkProtocol::Header::Type::AnnounceUpdate, r }); for (const auto & r : self.updates()) - hitems.push_back(TransportHeader::Item { - TransportHeader::Type::AnnounceUpdate, r }); + hitems.push_back(NetworkProtocol::Header::Item { + NetworkProtocol::Header::Type::AnnounceUpdate, r }); - TransportHeader header(hitems); + NetworkProtocol::Header header(hitems); for (const auto & peer : peers) peer->send(header, { **self.ref() }, false); @@ -719,7 +719,7 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) } } -void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs, bool secure) +void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure) { vector<uint8_t> data, part, out; @@ -786,7 +786,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) auto req = Channel::generateRequest(tempStorage, server.self, std::get<Identity>(identity)); channel.emplace<Stored<ChannelRequest>>(req); - reply.header({ TransportHeader::Type::ChannelRequest, req.ref() }); + reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() }); reply.body(req.ref()); reply.body(req->data.ref()); reply.body(req->data->key.ref()); @@ -801,7 +801,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) req->isSignedBy(std::get<Identity>(identity).keyMessage())) { if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) { channel.emplace<Stored<ChannelAccept>>(*acc); - reply.header({ TransportHeader::Type::ChannelAccept, acc->ref() }); + reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() }); reply.body(acc->ref()); reply.body(acc.value()->data.ref()); reply.body(acc.value()->data->key.ref()); @@ -821,13 +821,13 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch) { channel.emplace<unique_ptr<Channel>>(move(ch)); - vector<TransportHeader::Item> hitems; + vector<NetworkProtocol::Header::Item> hitems; for (const auto & r : server.self.refs()) - reply.header(TransportHeader::Item { - TransportHeader::Type::AnnounceUpdate, r }); + reply.header(NetworkProtocol::Header::Item { + NetworkProtocol::Header::Type::AnnounceUpdate, r }); for (const auto & r : server.self.updates()) - reply.header(TransportHeader::Item { - TransportHeader::Type::AnnounceUpdate, r }); + reply.header(NetworkProtocol::Header::Item { + NetworkProtocol::Header::Type::AnnounceUpdate, r }); } void Server::Peer::updateService(ReplyBuilder & reply) @@ -881,7 +881,7 @@ void Server::Peer::trySendOutQueue() } -void ReplyBuilder::header(TransportHeader::Item && item) +void ReplyBuilder::header(NetworkProtocol::Header::Item && item) { for (const auto & x : mheader) if (x == item) @@ -926,146 +926,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply) return r; for (const auto & d : missing) - reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) }); return nullopt; } - - -bool TransportHeader::Item::operator==(const Item & other) const -{ - if (type != other.type) - return false; - - if (value.index() != other.value.index()) - return false; - - if (holds_alternative<PartialRef>(value)) - return std::get<PartialRef>(value).digest() == - std::get<PartialRef>(other.value).digest(); - - if (holds_alternative<UUID>(value)) - return std::get<UUID>(value) == std::get<UUID>(other.value); - - throw runtime_error("unhandled transport header item type"); -} - -optional<TransportHeader> TransportHeader::load(const PartialRef & ref) -{ - return load(*ref); -} - -optional<TransportHeader> TransportHeader::load(const PartialObject & obj) -{ - auto rec = obj.asRecord(); - if (!rec) - return nullopt; - - vector<Item> items; - for (const auto & item : rec->items()) { - if (item.name == "ACK") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::Acknowledged, - .value = *ref, - }); - } else if (item.name == "REQ") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::DataRequest, - .value = *ref, - }); - } else if (item.name == "RSP") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::DataResponse, - .value = *ref, - }); - } else if (item.name == "ANN") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::AnnounceSelf, - .value = *ref, - }); - } else if (item.name == "ANU") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::AnnounceUpdate, - .value = *ref, - }); - } else if (item.name == "CRQ") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ChannelRequest, - .value = *ref, - }); - } else if (item.name == "CAC") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ChannelAccept, - .value = *ref, - }); - } else if (item.name == "STP") { - if (auto val = item.asUUID()) - items.emplace_back(Item { - .type = Type::ServiceType, - .value = *val, - }); - } else if (item.name == "SRF") { - if (auto ref = item.asRef()) - items.emplace_back(Item { - .type = Type::ServiceRef, - .value = *ref, - }); - } - } - - return TransportHeader(items); -} - -PartialObject TransportHeader::toObject() const -{ - vector<PartialRecord::Item> ritems; - - for (const auto & item : items) { - switch (item.type) { - case Type::Acknowledged: - ritems.emplace_back("ACK", std::get<PartialRef>(item.value)); - break; - - case Type::DataRequest: - ritems.emplace_back("REQ", std::get<PartialRef>(item.value)); - break; - - case Type::DataResponse: - ritems.emplace_back("RSP", std::get<PartialRef>(item.value)); - break; - - case Type::AnnounceSelf: - ritems.emplace_back("ANN", std::get<PartialRef>(item.value)); - break; - - case Type::AnnounceUpdate: - ritems.emplace_back("ANU", std::get<PartialRef>(item.value)); - break; - - case Type::ChannelRequest: - ritems.emplace_back("CRQ", std::get<PartialRef>(item.value)); - break; - - case Type::ChannelAccept: - ritems.emplace_back("CAC", std::get<PartialRef>(item.value)); - break; - - case Type::ServiceType: - ritems.emplace_back("STP", std::get<UUID>(item.value)); - break; - - case Type::ServiceRef: - ritems.emplace_back("SRF", std::get<PartialRef>(item.value)); - break; - } - } - - return PartialObject(PartialRecord(std::move(ritems))); -} |