diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2022-04-29 23:41:53 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2022-04-29 23:41:53 +0200 | 
| commit | 92564f487083e5be8f6c704521b74851a6cdeaf9 (patch) | |
| tree | 0c7494d004065f6d816bdf14ef5f35c0d064a362 /src | |
| parent | 8b180ffd0551831931cdb61c925987f9014adad8 (diff) | |
Network: process and acknowledge identity updates
Diffstat (limited to 'src')
| -rw-r--r-- | src/network.cpp | 95 | ||||
| -rw-r--r-- | src/network.h | 2 | 
2 files changed, 78 insertions, 19 deletions
| diff --git a/src/network.cpp b/src/network.cpp index 84c3083..2e3cf3b 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -13,6 +13,8 @@  #include <net/if.h>  #include <unistd.h> +using std::get; +using std::get_if;  using std::holds_alternative;  using std::move;  using std::runtime_error; @@ -398,6 +400,7 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in & paddr)  		.server = *this,  		.addr = paddr,  		.identity = monostate(), +		.identityUpdates = {},  		.channel = monostate(),  		.tempStorage = st,  		.partStorage = st.derivePartialStorage(), @@ -461,22 +464,37 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			if (pref.digest() == self.ref()->digest())  				break; -			if (holds_alternative<monostate>(peer.identity)) +			if (holds_alternative<monostate>(peer.identity)) {  				reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); -			shared_ptr<WaitingRef> wref(new WaitingRef { -				.storage = peer.tempStorage, -				.ref = pref, -				.peer = peer, -				.missing = {}, -			}); -			waiting.push_back(wref); -			peer.identity = wref; -			wref->check(reply); +				shared_ptr<WaitingRef> wref(new WaitingRef { +					.storage = peer.tempStorage, +					.ref = pref, +					.peer = peer, +					.missing = {}, +				}); +				waiting.push_back(wref); +				peer.identity = wref; +				wref->check(reply); +			}  			break;  		}  		case TransportHeader::Type::AnnounceUpdate: +			if (holds_alternative<Identity>(peer.identity)) { +				auto pref = std::get<PartialRef>(item.value); +				reply.header({ TransportHeader::Type::Acknowledged, pref }); + +				shared_ptr<WaitingRef> wref(new WaitingRef { +					.storage = peer.tempStorage, +					.ref = pref, +					.peer = peer, +					.missing = {}, +				}); +				waiting.push_back(wref); +				peer.identityUpdates.push_back(wref); +				wref->check(reply); +			}  			break;  		case TransportHeader::Type::ChannelRequest: @@ -554,12 +572,18 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)  {  	scoped_lock lock(dataMutex);  	if (auto id = head->identity()) { -		if (id->ref()->digest() != self.ref()->digest()) { +		if (*id != self) {  			self = *id; -			TransportHeader header({ -				{ TransportHeader::Type::AnnounceSelf, *self.ref() } -			}); +			vector<TransportHeader::Item> hitems; +			for (const auto & r : self.refs()) +				hitems.push_back(TransportHeader::Item { +					TransportHeader::Type::AnnounceUpdate, r }); +			for (const auto & r : self.updates()) +				hitems.push_back(TransportHeader::Item { +					TransportHeader::Type::AnnounceUpdate, r }); + +			TransportHeader header(hitems);  			for (const auto & peer : peers)  				peer->send(header, { **self.ref() }, false); @@ -590,15 +614,40 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o  				(sockaddr *) &addr, sizeof(addr));  } -void Server::Peer::updateIdentity(ReplyBuilder & reply) +void Server::Peer::updateIdentity(ReplyBuilder &)  { -	if (holds_alternative<shared_ptr<WaitingRef>>(identity)) -		if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply)) +	if (holds_alternative<shared_ptr<WaitingRef>>(identity)) { +		if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check())  			if (auto id = Identity::load(*ref)) {  				identity.emplace<Identity>(*id);  				if (lpeer)  					lpeer->notifyWatchers();  			} +	} +	else if (holds_alternative<Identity>(identity)) { +		if (!identityUpdates.empty()) { +			decltype(identityUpdates) keep; +			vector<Stored<Signed<IdentityData>>> updates; + +			for (auto wref : identityUpdates) { +				if (auto ref = wref->check()) +					updates.push_back(Stored<Signed<IdentityData>>::load(*ref)); +				else +					keep.push_back(move(wref)); +			} + +			identityUpdates = move(keep); + +			if (!updates.empty()) { +				auto nid = get<Identity>(identity).update(updates); +				if (nid != get<Identity>(identity)) { +					identity = move(nid); +					if (lpeer) +						lpeer->notifyWatchers(); +				} +			} +		} +	}  }  void Server::Peer::updateChannel(ReplyBuilder & reply) @@ -716,16 +765,24 @@ vector<Object> ReplyBuilder::body() const  } -optional<Ref> WaitingRef::check(ReplyBuilder & reply) +optional<Ref> WaitingRef::check()  {  	if (auto r = storage.ref(ref.digest()))  		return *r;  	auto res = storage.copy(ref); -	if (auto r = std::get_if<Ref>(&res)) +	if (auto r = get_if<Ref>(&res))  		return *r;  	missing = std::get<vector<Digest>>(res); +	return nullopt; +} + +optional<Ref> WaitingRef::check(ReplyBuilder & reply) +{ +	if (auto r = check()) +		return r; +  	for (const auto & d : missing)  		reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); diff --git a/src/network.h b/src/network.h index 40c06bf..8e99df1 100644 --- a/src/network.h +++ b/src/network.h @@ -45,6 +45,7 @@ struct Server::Peer  	variant<monostate,  		shared_ptr<struct WaitingRef>,  		Identity> identity; +	vector<shared_ptr<WaitingRef>> identityUpdates;  	variant<monostate,  		Stored<ChannelRequest>, @@ -133,6 +134,7 @@ struct WaitingRef  	const Server::Peer & peer;  	vector<Digest> missing; +	optional<Ref> check();  	optional<Ref> check(ReplyBuilder &);  }; |