diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/network.cpp | 156 | ||||
-rw-r--r-- | src/network/protocol.cpp | 39 | ||||
-rw-r--r-- | src/network/protocol.h | 4 |
3 files changed, 100 insertions, 99 deletions
diff --git a/src/network.cpp b/src/network.cpp index ef06d6e..725ac50 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -115,7 +115,7 @@ void Server::addPeer(const string & node, const string & service) const { shared_lock lock(p->selfMutex); header.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() }); + NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() }); } peer.send(header, {}, false); return; @@ -234,7 +234,7 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const if (auto speer = p->speer.lock()) { NetworkProtocol::Header header({ { NetworkProtocol::Header::Type::ServiceType, uuid }, - { NetworkProtocol::Header::Type::ServiceRef, ref }, + { NetworkProtocol::Header::Type::ServiceRef, ref.digest() }, }); speer->send(header, { obj }, true); return true; @@ -442,6 +442,8 @@ void Server::Priv::doListen() void Server::Priv::doAnnounce() { + auto pst = self.ref()->storage().derivePartialStorage(); + unique_lock lock(dataMutex); auto lastAnnounce = steady_clock::now() - announceInterval; @@ -451,10 +453,10 @@ void Server::Priv::doAnnounce() if (lastAnnounce + announceInterval < now) { shared_lock slock(selfMutex); NetworkProtocol::Header header({ - { NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() } + { NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest() } }); - vector<uint8_t> bytes = header.toObject().encode(); + vector<uint8_t> bytes = header.toObject(pst).encode(); for (const auto & in : bcastAddresses) { sockaddr_in sin = {}; @@ -544,56 +546,55 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head for (auto & item : header.items) { switch (item.type) { - case NetworkProtocol::Header::Type::Acknowledged: - if (auto pref = std::get<PartialRef>(item.value)) { - if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && - std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest()) - peer.finalizeChannel(reply, - std::get<Stored<ChannelAccept>>(peer.channel)->data->channel()); - } + case NetworkProtocol::Header::Type::Acknowledged: { + auto dgst = std::get<Digest>(item.value); + if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && + std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst) + peer.finalizeChannel(reply, + std::get<Stored<ChannelAccept>>(peer.channel)->data->channel()); break; + } case NetworkProtocol::Header::Type::DataRequest: { - auto pref = std::get<PartialRef>(item.value); + auto dgst = std::get<Digest>(item.value); if (holds_alternative<unique_ptr<Channel>>(peer.channel) || - plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { - if (auto ref = peer.tempStorage.ref(pref.digest())) { - NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref }; - reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref }); + plaintextRefs.find(dgst) != plaintextRefs.end()) { + if (auto ref = peer.tempStorage.ref(dgst)) { + reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() }); reply.body(*ref); } } break; } - case NetworkProtocol::Header::Type::DataResponse: - if (auto pref = std::get<PartialRef>(item.value)) { - reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); - for (auto & pwref : waiting) { - if (auto wref = pwref.lock()) { - if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != - wref->missing.end()) { - if (wref->check(reply)) - pwref.reset(); - } + case NetworkProtocol::Header::Type::DataResponse: { + auto dgst = std::get<Digest>(item.value); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + for (auto & pwref : waiting) { + if (auto wref = pwref.lock()) { + if (std::find(wref->missing.begin(), wref->missing.end(), dgst) != + wref->missing.end()) { + if (wref->check(reply)) + pwref.reset(); } } - waiting.erase(std::remove_if(waiting.begin(), waiting.end(), - [](auto & wref) { return wref.expired(); }), waiting.end()); } + waiting.erase(std::remove_if(waiting.begin(), waiting.end(), + [](auto & wref) { return wref.expired(); }), waiting.end()); break; + } case NetworkProtocol::Header::Type::AnnounceSelf: { - auto pref = std::get<PartialRef>(item.value); - if (pref.digest() == self.ref()->digest()) + auto dgst = std::get<Digest>(item.value); + if (dgst == self.ref()->digest()) break; if (holds_alternative<monostate>(peer.identity)) { - reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()}); + reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest()}); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, - .ref = pref, + .ref = peer.partStorage.ref(dgst), .peer = peer, .missing = {}, }); @@ -606,12 +607,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head case NetworkProtocol::Header::Type::AnnounceUpdate: if (holds_alternative<Identity>(peer.identity)) { - auto pref = std::get<PartialRef>(item.value); - reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); + auto dgst = std::get<Digest>(item.value); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, - .ref = pref, + .ref = peer.partStorage.ref(dgst), .peer = peer, .missing = {}, }); @@ -621,46 +622,46 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head } break; - case NetworkProtocol::Header::Type::ChannelRequest: - if (auto pref = std::get<PartialRef>(item.value)) { - reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); + case NetworkProtocol::Header::Type::ChannelRequest: { + auto dgst = std::get<Digest>(item.value); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); - if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && - std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest()) - break; + if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && + std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst) + break; - if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) - break; + if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) + break; - shared_ptr<WaitingRef> wref(new WaitingRef { - .storage = peer.tempStorage, - .ref = pref, - .peer = peer, - .missing = {}, - }); - waiting.push_back(wref); - peer.channel = wref; - wref->check(reply); - } + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = peer.partStorage.ref(dgst), + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.channel = wref; + wref->check(reply); break; + } - case NetworkProtocol::Header::Type::ChannelAccept: - if (auto pref = std::get<PartialRef>(item.value)) { - if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && - std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest()) - break; - - auto cres = peer.tempStorage.copy(pref); - if (auto r = std::get_if<Ref>(&cres)) { - auto acc = ChannelAccept::load(*r); - if (holds_alternative<Identity>(peer.identity) && - acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { - reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); - peer.finalizeChannel(reply, acc.data->channel()); - } + case NetworkProtocol::Header::Type::ChannelAccept: { + auto dgst = std::get<Digest>(item.value); + if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && + std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst) + break; + + auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); + if (auto r = std::get_if<Ref>(&cres)) { + auto acc = ChannelAccept::load(*r); + if (holds_alternative<Identity>(peer.identity) && + acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { + reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); + peer.finalizeChannel(reply, acc.data->channel()); } } break; + } case NetworkProtocol::Header::Type::ServiceType: if (!serviceType) @@ -677,9 +678,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head if (!serviceType) break; - auto pref = std::get<PartialRef>(item.value); + auto dgst = std::get<Digest>(item.value); + auto pref = peer.partStorage.ref(dgst); if (pref) - reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); + reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, @@ -706,10 +708,10 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) vector<NetworkProtocol::Header::Item> hitems; for (const auto & r : self.refs()) hitems.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r }); + NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); for (const auto & r : self.updates()) hitems.push_back(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r }); + NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); NetworkProtocol::Header header(hitems); @@ -723,7 +725,7 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj { vector<uint8_t> data, part, out; - part = header.toObject().encode(); + part = header.toObject(partStorage).encode(); data.insert(data.end(), part.begin(), part.end()); for (const auto & obj : objs) { part = obj.encode(); @@ -786,7 +788,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) auto req = Channel::generateRequest(tempStorage, server.self, std::get<Identity>(identity)); channel.emplace<Stored<ChannelRequest>>(req); - reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() }); + reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() }); reply.body(req.ref()); reply.body(req->data.ref()); reply.body(req->data->key.ref()); @@ -801,7 +803,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) req->isSignedBy(std::get<Identity>(identity).keyMessage())) { if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) { channel.emplace<Stored<ChannelAccept>>(*acc); - reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() }); + reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() }); reply.body(acc->ref()); reply.body(acc.value()->data.ref()); reply.body(acc.value()->data->key.ref()); @@ -824,10 +826,10 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch) vector<NetworkProtocol::Header::Item> hitems; for (const auto & r : server.self.refs()) reply.header(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r }); + NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); for (const auto & r : server.self.updates()) reply.header(NetworkProtocol::Header::Item { - NetworkProtocol::Header::Type::AnnounceUpdate, r }); + NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() }); } void Server::Peer::updateService(ReplyBuilder & reply) @@ -926,7 +928,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply) return r; for (const auto & d : missing) - reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) }); + reply.header({ NetworkProtocol::Header::Type::DataRequest, d }); return nullopt; } diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index c2c6c5d..fb3a5ea 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -215,9 +215,8 @@ bool NetworkProtocol::Header::Item::operator==(const Item & other) const if (value.index() != other.value.index()) return false; - if (holds_alternative<PartialRef>(value)) - return std::get<PartialRef>(value).digest() == - std::get<PartialRef>(other.value).digest(); + if (holds_alternative<Digest>(value)) + return std::get<Digest>(value) == std::get<Digest>(other.value); if (holds_alternative<UUID>(value)) return std::get<UUID>(value) == std::get<UUID>(other.value); @@ -242,43 +241,43 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::Acknowledged, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "REQ") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::DataRequest, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "RSP") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::DataResponse, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "ANN") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::AnnounceSelf, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "ANU") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::AnnounceUpdate, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "CRQ") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ChannelRequest, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "CAC") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ChannelAccept, - .value = *ref, + .value = ref->digest(), }); } else if (item.name == "STP") { if (auto val = item.asUUID()) @@ -290,7 +289,7 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ServiceRef, - .value = *ref, + .value = ref->digest(), }); } } @@ -298,38 +297,38 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj return NetworkProtocol::Header(items); } -PartialObject NetworkProtocol::Header::toObject() const +PartialObject NetworkProtocol::Header::toObject(const PartialStorage & st) const { vector<PartialRecord::Item> ritems; for (const auto & item : items) { switch (item.type) { case Type::Acknowledged: - ritems.emplace_back("ACK", std::get<PartialRef>(item.value)); + ritems.emplace_back("ACK", st.ref(std::get<Digest>(item.value))); break; case Type::DataRequest: - ritems.emplace_back("REQ", std::get<PartialRef>(item.value)); + ritems.emplace_back("REQ", st.ref(std::get<Digest>(item.value))); break; case Type::DataResponse: - ritems.emplace_back("RSP", std::get<PartialRef>(item.value)); + ritems.emplace_back("RSP", st.ref(std::get<Digest>(item.value))); break; case Type::AnnounceSelf: - ritems.emplace_back("ANN", std::get<PartialRef>(item.value)); + ritems.emplace_back("ANN", st.ref(std::get<Digest>(item.value))); break; case Type::AnnounceUpdate: - ritems.emplace_back("ANU", std::get<PartialRef>(item.value)); + ritems.emplace_back("ANU", st.ref(std::get<Digest>(item.value))); break; case Type::ChannelRequest: - ritems.emplace_back("CRQ", std::get<PartialRef>(item.value)); + ritems.emplace_back("CRQ", st.ref(std::get<Digest>(item.value))); break; case Type::ChannelAccept: - ritems.emplace_back("CAC", std::get<PartialRef>(item.value)); + ritems.emplace_back("CAC", st.ref(std::get<Digest>(item.value))); break; case Type::ServiceType: @@ -337,7 +336,7 @@ PartialObject NetworkProtocol::Header::toObject() const break; case Type::ServiceRef: - ritems.emplace_back("SRF", std::get<PartialRef>(item.value)); + ritems.emplace_back("SRF", st.ref(std::get<Digest>(item.value))); break; } } diff --git a/src/network/protocol.h b/src/network/protocol.h index 8aa22a2..4794ba6 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -107,7 +107,7 @@ struct NetworkProtocol::Header struct Item { const Type type; - const variant<PartialRef, UUID> value; + const variant<Digest, UUID> value; bool operator==(const Item &) const; bool operator!=(const Item & other) const { return !(*this == other); } @@ -116,7 +116,7 @@ struct NetworkProtocol::Header Header(const vector<Item> & items): items(items) {} static optional<Header> load(const PartialRef &); static optional<Header> load(const PartialObject &); - PartialObject toObject() const; + PartialObject toObject(const PartialStorage &) const; const vector<Item> items; }; |