summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2022-04-29 23:41:53 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2022-04-29 23:41:53 +0200
commit92564f487083e5be8f6c704521b74851a6cdeaf9 (patch)
tree0c7494d004065f6d816bdf14ef5f35c0d064a362
parent8b180ffd0551831931cdb61c925987f9014adad8 (diff)
Network: process and acknowledge identity updates
-rw-r--r--src/network.cpp95
-rw-r--r--src/network.h2
2 files changed, 78 insertions, 19 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 84c3083..2e3cf3b 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -13,6 +13,8 @@
#include <net/if.h>
#include <unistd.h>
+using std::get;
+using std::get_if;
using std::holds_alternative;
using std::move;
using std::runtime_error;
@@ -398,6 +400,7 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
.server = *this,
.addr = paddr,
.identity = monostate(),
+ .identityUpdates = {},
.channel = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
@@ -461,22 +464,37 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
if (pref.digest() == self.ref()->digest())
break;
- if (holds_alternative<monostate>(peer.identity))
+ if (holds_alternative<monostate>(peer.identity)) {
reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()});
- shared_ptr<WaitingRef> wref(new WaitingRef {
- .storage = peer.tempStorage,
- .ref = pref,
- .peer = peer,
- .missing = {},
- });
- waiting.push_back(wref);
- peer.identity = wref;
- wref->check(reply);
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.identity = wref;
+ wref->check(reply);
+ }
break;
}
case TransportHeader::Type::AnnounceUpdate:
+ if (holds_alternative<Identity>(peer.identity)) {
+ auto pref = std::get<PartialRef>(item.value);
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
+
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.identityUpdates.push_back(wref);
+ wref->check(reply);
+ }
break;
case TransportHeader::Type::ChannelRequest:
@@ -554,12 +572,18 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
{
scoped_lock lock(dataMutex);
if (auto id = head->identity()) {
- if (id->ref()->digest() != self.ref()->digest()) {
+ if (*id != self) {
self = *id;
- TransportHeader header({
- { TransportHeader::Type::AnnounceSelf, *self.ref() }
- });
+ vector<TransportHeader::Item> hitems;
+ for (const auto & r : self.refs())
+ hitems.push_back(TransportHeader::Item {
+ TransportHeader::Type::AnnounceUpdate, r });
+ for (const auto & r : self.updates())
+ hitems.push_back(TransportHeader::Item {
+ TransportHeader::Type::AnnounceUpdate, r });
+
+ TransportHeader header(hitems);
for (const auto & peer : peers)
peer->send(header, { **self.ref() }, false);
@@ -590,15 +614,40 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o
(sockaddr *) &addr, sizeof(addr));
}
-void Server::Peer::updateIdentity(ReplyBuilder & reply)
+void Server::Peer::updateIdentity(ReplyBuilder &)
{
- if (holds_alternative<shared_ptr<WaitingRef>>(identity))
- if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply))
+ if (holds_alternative<shared_ptr<WaitingRef>>(identity)) {
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check())
if (auto id = Identity::load(*ref)) {
identity.emplace<Identity>(*id);
if (lpeer)
lpeer->notifyWatchers();
}
+ }
+ else if (holds_alternative<Identity>(identity)) {
+ if (!identityUpdates.empty()) {
+ decltype(identityUpdates) keep;
+ vector<Stored<Signed<IdentityData>>> updates;
+
+ for (auto wref : identityUpdates) {
+ if (auto ref = wref->check())
+ updates.push_back(Stored<Signed<IdentityData>>::load(*ref));
+ else
+ keep.push_back(move(wref));
+ }
+
+ identityUpdates = move(keep);
+
+ if (!updates.empty()) {
+ auto nid = get<Identity>(identity).update(updates);
+ if (nid != get<Identity>(identity)) {
+ identity = move(nid);
+ if (lpeer)
+ lpeer->notifyWatchers();
+ }
+ }
+ }
+ }
}
void Server::Peer::updateChannel(ReplyBuilder & reply)
@@ -716,16 +765,24 @@ vector<Object> ReplyBuilder::body() const
}
-optional<Ref> WaitingRef::check(ReplyBuilder & reply)
+optional<Ref> WaitingRef::check()
{
if (auto r = storage.ref(ref.digest()))
return *r;
auto res = storage.copy(ref);
- if (auto r = std::get_if<Ref>(&res))
+ if (auto r = get_if<Ref>(&res))
return *r;
missing = std::get<vector<Digest>>(res);
+ return nullopt;
+}
+
+optional<Ref> WaitingRef::check(ReplyBuilder & reply)
+{
+ if (auto r = check())
+ return r;
+
for (const auto & d : missing)
reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
diff --git a/src/network.h b/src/network.h
index 40c06bf..8e99df1 100644
--- a/src/network.h
+++ b/src/network.h
@@ -45,6 +45,7 @@ struct Server::Peer
variant<monostate,
shared_ptr<struct WaitingRef>,
Identity> identity;
+ vector<shared_ptr<WaitingRef>> identityUpdates;
variant<monostate,
Stored<ChannelRequest>,
@@ -133,6 +134,7 @@ struct WaitingRef
const Server::Peer & peer;
vector<Digest> missing;
+ optional<Ref> check();
optional<Ref> check(ReplyBuilder &);
};