diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2022-04-29 23:41:53 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2022-04-29 23:41:53 +0200 |
commit | 92564f487083e5be8f6c704521b74851a6cdeaf9 (patch) | |
tree | 0c7494d004065f6d816bdf14ef5f35c0d064a362 /src | |
parent | 8b180ffd0551831931cdb61c925987f9014adad8 (diff) |
Network: process and acknowledge identity updates
Diffstat (limited to 'src')
-rw-r--r-- | src/network.cpp | 95 | ||||
-rw-r--r-- | src/network.h | 2 |
2 files changed, 78 insertions, 19 deletions
diff --git a/src/network.cpp b/src/network.cpp index 84c3083..2e3cf3b 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -13,6 +13,8 @@ #include <net/if.h> #include <unistd.h> +using std::get; +using std::get_if; using std::holds_alternative; using std::move; using std::runtime_error; @@ -398,6 +400,7 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in & paddr) .server = *this, .addr = paddr, .identity = monostate(), + .identityUpdates = {}, .channel = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), @@ -461,22 +464,37 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea if (pref.digest() == self.ref()->digest()) break; - if (holds_alternative<monostate>(peer.identity)) + if (holds_alternative<monostate>(peer.identity)) { reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); - shared_ptr<WaitingRef> wref(new WaitingRef { - .storage = peer.tempStorage, - .ref = pref, - .peer = peer, - .missing = {}, - }); - waiting.push_back(wref); - peer.identity = wref; - wref->check(reply); + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(reply); + } break; } case TransportHeader::Type::AnnounceUpdate: + if (holds_alternative<Identity>(peer.identity)) { + auto pref = std::get<PartialRef>(item.value); + reply.header({ TransportHeader::Type::Acknowledged, pref }); + + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identityUpdates.push_back(wref); + wref->check(reply); + } break; case TransportHeader::Type::ChannelRequest: @@ -554,12 +572,18 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) { scoped_lock lock(dataMutex); if (auto id = head->identity()) { - if (id->ref()->digest() != self.ref()->digest()) { + if (*id != self) { self = *id; - TransportHeader header({ - { TransportHeader::Type::AnnounceSelf, *self.ref() } - }); + vector<TransportHeader::Item> hitems; + for (const auto & r : self.refs()) + hitems.push_back(TransportHeader::Item { + TransportHeader::Type::AnnounceUpdate, r }); + for (const auto & r : self.updates()) + hitems.push_back(TransportHeader::Item { + TransportHeader::Type::AnnounceUpdate, r }); + + TransportHeader header(hitems); for (const auto & peer : peers) peer->send(header, { **self.ref() }, false); @@ -590,15 +614,40 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o (sockaddr *) &addr, sizeof(addr)); } -void Server::Peer::updateIdentity(ReplyBuilder & reply) +void Server::Peer::updateIdentity(ReplyBuilder &) { - if (holds_alternative<shared_ptr<WaitingRef>>(identity)) - if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply)) + if (holds_alternative<shared_ptr<WaitingRef>>(identity)) { + if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check()) if (auto id = Identity::load(*ref)) { identity.emplace<Identity>(*id); if (lpeer) lpeer->notifyWatchers(); } + } + else if (holds_alternative<Identity>(identity)) { + if (!identityUpdates.empty()) { + decltype(identityUpdates) keep; + vector<Stored<Signed<IdentityData>>> updates; + + for (auto wref : identityUpdates) { + if (auto ref = wref->check()) + updates.push_back(Stored<Signed<IdentityData>>::load(*ref)); + else + keep.push_back(move(wref)); + } + + identityUpdates = move(keep); + + if (!updates.empty()) { + auto nid = get<Identity>(identity).update(updates); + if (nid != get<Identity>(identity)) { + identity = move(nid); + if (lpeer) + lpeer->notifyWatchers(); + } + } + } + } } void Server::Peer::updateChannel(ReplyBuilder & reply) @@ -716,16 +765,24 @@ vector<Object> ReplyBuilder::body() const } -optional<Ref> WaitingRef::check(ReplyBuilder & reply) +optional<Ref> WaitingRef::check() { if (auto r = storage.ref(ref.digest())) return *r; auto res = storage.copy(ref); - if (auto r = std::get_if<Ref>(&res)) + if (auto r = get_if<Ref>(&res)) return *r; missing = std::get<vector<Digest>>(res); + return nullopt; +} + +optional<Ref> WaitingRef::check(ReplyBuilder & reply) +{ + if (auto r = check()) + return r; + for (const auto & d : missing) reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); diff --git a/src/network.h b/src/network.h index 40c06bf..8e99df1 100644 --- a/src/network.h +++ b/src/network.h @@ -45,6 +45,7 @@ struct Server::Peer variant<monostate, shared_ptr<struct WaitingRef>, Identity> identity; + vector<shared_ptr<WaitingRef>> identityUpdates; variant<monostate, Stored<ChannelRequest>, @@ -133,6 +134,7 @@ struct WaitingRef const Server::Peer & peer; vector<Digest> missing; + optional<Ref> check(); optional<Ref> check(ReplyBuilder &); }; |