diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-17 21:54:45 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-17 21:54:45 +0200 | 
| commit | 86b465cfccef5552aa111941fb74ec622e2e7c03 (patch) | |
| tree | b48b040e34be40f785702cbe02114b85ba658e67 | |
| parent | b09e73f0abcc386719a2235cc3ae61fb1cbfc5ca (diff) | |
Network: use digest instead of partial ref in protocol header
| -rw-r--r-- | src/network.cpp | 156 | ||||
| -rw-r--r-- | src/network/protocol.cpp | 39 | ||||
| -rw-r--r-- | src/network/protocol.h | 4 | 
3 files changed, 100 insertions, 99 deletions
| diff --git a/src/network.cpp b/src/network.cpp index ef06d6e..725ac50 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -115,7 +115,7 @@ void Server::addPeer(const string & node, const string & service) const  			{  				shared_lock lock(p->selfMutex);  				header.push_back(NetworkProtocol::Header::Item { -					NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() }); +					NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() });  			}  			peer.send(header, {}, false);  			return; @@ -234,7 +234,7 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const  	if (auto speer = p->speer.lock()) {  		NetworkProtocol::Header header({  			{ NetworkProtocol::Header::Type::ServiceType, uuid }, -				{ NetworkProtocol::Header::Type::ServiceRef, ref }, +				{ NetworkProtocol::Header::Type::ServiceRef, ref.digest() },  		});  		speer->send(header, { obj }, true);  		return true; @@ -442,6 +442,8 @@ void Server::Priv::doListen()  void Server::Priv::doAnnounce()  { +	auto pst = self.ref()->storage().derivePartialStorage(); +  	unique_lock lock(dataMutex);  	auto lastAnnounce = steady_clock::now() - announceInterval; @@ -451,10 +453,10 @@ void Server::Priv::doAnnounce()  		if (lastAnnounce + announceInterval < now) {  			shared_lock slock(selfMutex);  			NetworkProtocol::Header header({ -				{ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() } +				{ NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest() }  			}); -			vector<uint8_t> bytes = header.toObject().encode(); +			vector<uint8_t> bytes = header.toObject(pst).encode();  			for (const auto & in : bcastAddresses) {  				sockaddr_in sin = {}; @@ -544,56 +546,55 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  	for (auto & item : header.items) {  		switch (item.type) { -		case NetworkProtocol::Header::Type::Acknowledged: -			if (auto pref = std::get<PartialRef>(item.value)) { -				if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && -						std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest()) -					peer.finalizeChannel(reply, -						std::get<Stored<ChannelAccept>>(peer.channel)->data->channel()); -			} +		case NetworkProtocol::Header::Type::Acknowledged: { +			auto dgst = std::get<Digest>(item.value); +			if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && +					std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst) +				peer.finalizeChannel(reply, +					std::get<Stored<ChannelAccept>>(peer.channel)->data->channel());  			break; +		}  		case NetworkProtocol::Header::Type::DataRequest: { -			auto pref = std::get<PartialRef>(item.value); +			auto dgst = std::get<Digest>(item.value);  			if (holds_alternative<unique_ptr<Channel>>(peer.channel) || -					plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { -				if (auto ref = peer.tempStorage.ref(pref.digest())) { -					NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref }; -					reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref }); +					plaintextRefs.find(dgst) != plaintextRefs.end()) { +				if (auto ref = peer.tempStorage.ref(dgst)) { +					reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() });  					reply.body(*ref);  				}  			}  			break;  		} -		case NetworkProtocol::Header::Type::DataResponse: -			if (auto pref = std::get<PartialRef>(item.value)) { -				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); -				for (auto & pwref : waiting) { -					if (auto wref = pwref.lock()) { -						if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != -								wref->missing.end()) { -							if (wref->check(reply)) -								pwref.reset(); -						} +		case NetworkProtocol::Header::Type::DataResponse: { +			auto dgst = std::get<Digest>(item.value); +			reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); +			for (auto & pwref : waiting) { +				if (auto wref = pwref.lock()) { +					if (std::find(wref->missing.begin(), wref->missing.end(), dgst) != +							wref->missing.end()) { +						if (wref->check(reply)) +							pwref.reset();  					}  				} -				waiting.erase(std::remove_if(waiting.begin(), waiting.end(), -							[](auto & wref) { return wref.expired(); }), waiting.end());  			} +			waiting.erase(std::remove_if(waiting.begin(), waiting.end(), +						[](auto & wref) { return wref.expired(); }), waiting.end());  			break; +		}  		case NetworkProtocol::Header::Type::AnnounceSelf: { -			auto pref = std::get<PartialRef>(item.value); -			if (pref.digest() == self.ref()->digest()) +			auto dgst = std::get<Digest>(item.value); +			if (dgst == self.ref()->digest())  				break;  			if (holds_alternative<monostate>(peer.identity)) { -				reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()}); +				reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest()});  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage, -					.ref = pref, +					.ref = peer.partStorage.ref(dgst),  					.peer = peer,  					.missing = {},  				}); @@ -606,12 +607,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  		case NetworkProtocol::Header::Type::AnnounceUpdate:  			if (holds_alternative<Identity>(peer.identity)) { -				auto pref = std::get<PartialRef>(item.value); -				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); +				auto dgst = std::get<Digest>(item.value); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage, -					.ref = pref, +					.ref = peer.partStorage.ref(dgst),  					.peer = peer,  					.missing = {},  				}); @@ -621,46 +622,46 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  			}  			break; -		case NetworkProtocol::Header::Type::ChannelRequest: -			if (auto pref = std::get<PartialRef>(item.value)) { -				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); +		case NetworkProtocol::Header::Type::ChannelRequest: { +			auto dgst = std::get<Digest>(item.value); +			reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); -				if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && -						std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest()) -					break; +			if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && +					std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst) +				break; -				if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) -					break; +			if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) +				break; -				shared_ptr<WaitingRef> wref(new WaitingRef { -					.storage = peer.tempStorage, -					.ref = pref, -					.peer = peer, -					.missing = {}, -				}); -				waiting.push_back(wref); -				peer.channel = wref; -				wref->check(reply); -			} +			shared_ptr<WaitingRef> wref(new WaitingRef { +				.storage = peer.tempStorage, +				.ref = peer.partStorage.ref(dgst), +				.peer = peer, +				.missing = {}, +			}); +			waiting.push_back(wref); +			peer.channel = wref; +			wref->check(reply);  			break; +		} -		case NetworkProtocol::Header::Type::ChannelAccept: -			if (auto pref = std::get<PartialRef>(item.value)) { -				if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && -						std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest()) -					break; - -				auto cres = peer.tempStorage.copy(pref); -				if (auto r = std::get_if<Ref>(&cres)) { -					auto acc = ChannelAccept::load(*r); -					if (holds_alternative<Identity>(peer.identity) && -							acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { -						reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); -						peer.finalizeChannel(reply, acc.data->channel()); -					} +		case NetworkProtocol::Header::Type::ChannelAccept: { +			auto dgst = std::get<Digest>(item.value); +			if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && +					std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst) +				break; + +			auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); +			if (auto r = std::get_if<Ref>(&cres)) { +				auto acc = ChannelAccept::load(*r); +				if (holds_alternative<Identity>(peer.identity) && +						acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { +					reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); +					peer.finalizeChannel(reply, acc.data->channel());  				}  			}  			break; +		}  		case NetworkProtocol::Header::Type::ServiceType:  			if (!serviceType) @@ -677,9 +678,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  			if (!serviceType)  				break; -			auto pref = std::get<PartialRef>(item.value); +			auto dgst = std::get<Digest>(item.value); +			auto pref = peer.partStorage.ref(dgst);  			if (pref) -				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref }); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });  			shared_ptr<WaitingRef> wref(new WaitingRef {  				.storage = peer.tempStorage, @@ -706,10 +708,10 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)  			vector<NetworkProtocol::Header::Item> hitems;  			for (const auto & r : self.refs())  				hitems.push_back(NetworkProtocol::Header::Item { -					NetworkProtocol::Header::Type::AnnounceUpdate, r }); +					NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });  			for (const auto & r : self.updates())  				hitems.push_back(NetworkProtocol::Header::Item { -					NetworkProtocol::Header::Type::AnnounceUpdate, r }); +					NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });  			NetworkProtocol::Header header(hitems); @@ -723,7 +725,7 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj  {  	vector<uint8_t> data, part, out; -	part = header.toObject().encode(); +	part = header.toObject(partStorage).encode();  	data.insert(data.end(), part.begin(), part.end());  	for (const auto & obj : objs) {  		part = obj.encode(); @@ -786,7 +788,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  		auto req = Channel::generateRequest(tempStorage,  				server.self, std::get<Identity>(identity));  		channel.emplace<Stored<ChannelRequest>>(req); -		reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() }); +		reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() });  		reply.body(req.ref());  		reply.body(req->data.ref());  		reply.body(req->data->key.ref()); @@ -801,7 +803,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  					req->isSignedBy(std::get<Identity>(identity).keyMessage())) {  				if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {  					channel.emplace<Stored<ChannelAccept>>(*acc); -					reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() }); +					reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() });  					reply.body(acc->ref());  					reply.body(acc.value()->data.ref());  					reply.body(acc.value()->data->key.ref()); @@ -824,10 +826,10 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)  	vector<NetworkProtocol::Header::Item> hitems;  	for (const auto & r : server.self.refs())  		reply.header(NetworkProtocol::Header::Item { -			NetworkProtocol::Header::Type::AnnounceUpdate, r }); +			NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });  	for (const auto & r : server.self.updates())  		reply.header(NetworkProtocol::Header::Item { -			NetworkProtocol::Header::Type::AnnounceUpdate, r }); +			NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });  }  void Server::Peer::updateService(ReplyBuilder & reply) @@ -926,7 +928,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply)  		return r;  	for (const auto & d : missing) -		reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) }); +		reply.header({ NetworkProtocol::Header::Type::DataRequest, d });  	return nullopt;  } diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index c2c6c5d..fb3a5ea 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -215,9 +215,8 @@ bool NetworkProtocol::Header::Item::operator==(const Item & other) const  	if (value.index() != other.value.index())  		return false; -	if (holds_alternative<PartialRef>(value)) -		return std::get<PartialRef>(value).digest() == -			std::get<PartialRef>(other.value).digest(); +	if (holds_alternative<Digest>(value)) +		return std::get<Digest>(value) == std::get<Digest>(other.value);  	if (holds_alternative<UUID>(value))  		return std::get<UUID>(value) == std::get<UUID>(other.value); @@ -242,43 +241,43 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::Acknowledged, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "REQ") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::DataRequest, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "RSP") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::DataResponse, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "ANN") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::AnnounceSelf, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "ANU") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::AnnounceUpdate, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "CRQ") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::ChannelRequest, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "CAC") {  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::ChannelAccept, -					.value = *ref, +					.value = ref->digest(),  				});  		} else if (item.name == "STP") {  			if (auto val = item.asUUID()) @@ -290,7 +289,7 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj  			if (auto ref = item.asRef())  				items.emplace_back(Item {  					.type = Type::ServiceRef, -					.value = *ref, +					.value = ref->digest(),  				});  		}  	} @@ -298,38 +297,38 @@ optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObj  	return NetworkProtocol::Header(items);  } -PartialObject NetworkProtocol::Header::toObject() const +PartialObject NetworkProtocol::Header::toObject(const PartialStorage & st) const  {  	vector<PartialRecord::Item> ritems;  	for (const auto & item : items) {  		switch (item.type) {  		case Type::Acknowledged: -			ritems.emplace_back("ACK", std::get<PartialRef>(item.value)); +			ritems.emplace_back("ACK", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::DataRequest: -			ritems.emplace_back("REQ", std::get<PartialRef>(item.value)); +			ritems.emplace_back("REQ", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::DataResponse: -			ritems.emplace_back("RSP", std::get<PartialRef>(item.value)); +			ritems.emplace_back("RSP", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::AnnounceSelf: -			ritems.emplace_back("ANN", std::get<PartialRef>(item.value)); +			ritems.emplace_back("ANN", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::AnnounceUpdate: -			ritems.emplace_back("ANU", std::get<PartialRef>(item.value)); +			ritems.emplace_back("ANU", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::ChannelRequest: -			ritems.emplace_back("CRQ", std::get<PartialRef>(item.value)); +			ritems.emplace_back("CRQ", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::ChannelAccept: -			ritems.emplace_back("CAC", std::get<PartialRef>(item.value)); +			ritems.emplace_back("CAC", st.ref(std::get<Digest>(item.value)));  			break;  		case Type::ServiceType: @@ -337,7 +336,7 @@ PartialObject NetworkProtocol::Header::toObject() const  			break;  		case Type::ServiceRef: -			ritems.emplace_back("SRF", std::get<PartialRef>(item.value)); +			ritems.emplace_back("SRF", st.ref(std::get<Digest>(item.value)));  			break;  		}  	} diff --git a/src/network/protocol.h b/src/network/protocol.h index 8aa22a2..4794ba6 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -107,7 +107,7 @@ struct NetworkProtocol::Header  	struct Item {  		const Type type; -		const variant<PartialRef, UUID> value; +		const variant<Digest, UUID> value;  		bool operator==(const Item &) const;  		bool operator!=(const Item & other) const { return !(*this == other); } @@ -116,7 +116,7 @@ struct NetworkProtocol::Header  	Header(const vector<Item> & items): items(items) {}  	static optional<Header> load(const PartialRef &);  	static optional<Header> load(const PartialObject &); -	PartialObject toObject() const; +	PartialObject toObject(const PartialStorage &) const;  	const vector<Item> items;  }; |