From 92564f487083e5be8f6c704521b74851a6cdeaf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Fri, 29 Apr 2022 23:41:53 +0200 Subject: Network: process and acknowledge identity updates --- src/network.cpp | 95 +++++++++++++++++++++++++++++++++++++++++++++------------ src/network.h | 2 ++ 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/network.cpp b/src/network.cpp index 84c3083..2e3cf3b 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -13,6 +13,8 @@ #include #include +using std::get; +using std::get_if; using std::holds_alternative; using std::move; using std::runtime_error; @@ -398,6 +400,7 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in & paddr) .server = *this, .addr = paddr, .identity = monostate(), + .identityUpdates = {}, .channel = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), @@ -461,22 +464,37 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea if (pref.digest() == self.ref()->digest()) break; - if (holds_alternative(peer.identity)) + if (holds_alternative(peer.identity)) { reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); - shared_ptr wref(new WaitingRef { - .storage = peer.tempStorage, - .ref = pref, - .peer = peer, - .missing = {}, - }); - waiting.push_back(wref); - peer.identity = wref; - wref->check(reply); + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(reply); + } break; } case TransportHeader::Type::AnnounceUpdate: + if (holds_alternative(peer.identity)) { + auto pref = std::get(item.value); + reply.header({ TransportHeader::Type::Acknowledged, pref }); + + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identityUpdates.push_back(wref); + wref->check(reply); + } break; case TransportHeader::Type::ChannelRequest: @@ -554,12 +572,18 @@ void Server::Priv::handleLocalHeadChange(const Head & head) { scoped_lock lock(dataMutex); if (auto id = head->identity()) { - if (id->ref()->digest() != self.ref()->digest()) { + if (*id != self) { self = *id; - TransportHeader header({ - { TransportHeader::Type::AnnounceSelf, *self.ref() } - }); + vector hitems; + for (const auto & r : self.refs()) + hitems.push_back(TransportHeader::Item { + TransportHeader::Type::AnnounceUpdate, r }); + for (const auto & r : self.updates()) + hitems.push_back(TransportHeader::Item { + TransportHeader::Type::AnnounceUpdate, r }); + + TransportHeader header(hitems); for (const auto & peer : peers) peer->send(header, { **self.ref() }, false); @@ -590,15 +614,40 @@ void Server::Peer::send(const TransportHeader & header, const vector & o (sockaddr *) &addr, sizeof(addr)); } -void Server::Peer::updateIdentity(ReplyBuilder & reply) +void Server::Peer::updateIdentity(ReplyBuilder &) { - if (holds_alternative>(identity)) - if (auto ref = std::get>(identity)->check(reply)) + if (holds_alternative>(identity)) { + if (auto ref = std::get>(identity)->check()) if (auto id = Identity::load(*ref)) { identity.emplace(*id); if (lpeer) lpeer->notifyWatchers(); } + } + else if (holds_alternative(identity)) { + if (!identityUpdates.empty()) { + decltype(identityUpdates) keep; + vector>> updates; + + for (auto wref : identityUpdates) { + if (auto ref = wref->check()) + updates.push_back(Stored>::load(*ref)); + else + keep.push_back(move(wref)); + } + + identityUpdates = move(keep); + + if (!updates.empty()) { + auto nid = get(identity).update(updates); + if (nid != get(identity)) { + identity = move(nid); + if (lpeer) + lpeer->notifyWatchers(); + } + } + } + } } void Server::Peer::updateChannel(ReplyBuilder & reply) @@ -716,16 +765,24 @@ vector ReplyBuilder::body() const } -optional WaitingRef::check(ReplyBuilder & reply) +optional WaitingRef::check() { if (auto r = storage.ref(ref.digest())) return *r; auto res = storage.copy(ref); - if (auto r = std::get_if(&res)) + if (auto r = get_if(&res)) return *r; missing = std::get>(res); + return nullopt; +} + +optional WaitingRef::check(ReplyBuilder & reply) +{ + if (auto r = check()) + return r; + for (const auto & d : missing) reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); diff --git a/src/network.h b/src/network.h index 40c06bf..8e99df1 100644 --- a/src/network.h +++ b/src/network.h @@ -45,6 +45,7 @@ struct Server::Peer variant, Identity> identity; + vector> identityUpdates; variant, @@ -133,6 +134,7 @@ struct WaitingRef const Server::Peer & peer; vector missing; + optional check(); optional check(ReplyBuilder &); }; -- cgit v1.2.3