diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-16 20:53:58 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-16 21:49:39 +0200 | 
| commit | b09e73f0abcc386719a2235cc3ae61fb1cbfc5ca (patch) | |
| tree | 462085ac8fb592f4a2c07446058a7e761b51dfac /src | |
| parent | 2ed8103ff1c0fca7372b3c3888f590ba41c525e6 (diff) | |
Move network header definitions to protocol module
Diffstat (limited to 'src')
| -rw-r--r-- | src/network.cpp | 229 | ||||
| -rw-r--r-- | src/network.h | 42 | ||||
| -rw-r--r-- | src/network/protocol.cpp | 150 | ||||
| -rw-r--r-- | src/network/protocol.h | 36 | 
4 files changed, 237 insertions, 220 deletions
| diff --git a/src/network.cpp b/src/network.cpp index 786e752..ef06d6e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -111,11 +111,11 @@ void Server::addPeer(const string & node, const string & service) const  		if (rp->ai_family == AF_INET6) {  			Peer & peer = p->getPeer(*(sockaddr_in6 *)rp->ai_addr); -			vector<TransportHeader::Item> header; +			vector<NetworkProtocol::Header::Item> header;  			{  				shared_lock lock(p->selfMutex); -				header.push_back(TransportHeader::Item { -					TransportHeader::Type::AnnounceSelf, *p->self.ref() }); +				header.push_back(NetworkProtocol::Header::Item { +					NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() });  			}  			peer.send(header, {}, false);  			return; @@ -232,9 +232,9 @@ bool Peer::send(UUID uuid, const Object & obj) const  bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const  {  	if (auto speer = p->speer.lock()) { -		TransportHeader header({ -			{ TransportHeader::Type::ServiceType, uuid }, -				{ TransportHeader::Type::ServiceRef, ref }, +		NetworkProtocol::Header header({ +			{ NetworkProtocol::Header::Type::ServiceType, uuid }, +				{ NetworkProtocol::Header::Type::ServiceRef, ref },  		});  		speer->send(header, { obj }, true);  		return true; @@ -411,7 +411,7 @@ void Server::Priv::doListen()  		if (auto dec = PartialObject::decodePrefix(peer->partStorage,  				current->begin(), current->end())) { -			if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { +			if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) {  				auto pos = std::get<1>(*dec);  				while (auto cdec = PartialObject::decodePrefix(peer->partStorage,  							pos, current->end())) { @@ -430,7 +430,7 @@ void Server::Priv::doListen()  				peer->updateService(reply);  				if (!reply.header().empty()) -					peer->send(TransportHeader(reply.header()), reply.body(), false); +					peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false);  				peer->trySendOutQueue();  			} @@ -450,8 +450,8 @@ void Server::Priv::doAnnounce()  		if (lastAnnounce + announceInterval < now) {  			shared_lock slock(selfMutex); -			TransportHeader header({ -				{ TransportHeader::Type::AnnounceSelf, *self.ref() } +			NetworkProtocol::Header header({ +				{ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() }  			});  			vector<uint8_t> bytes = header.toObject().encode(); @@ -534,7 +534,7 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)  	return *peer;  } -void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & header, ReplyBuilder & reply) +void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Header & header, ReplyBuilder & reply)  {  	unordered_set<Digest> plaintextRefs;  	for (const auto & obj : collectStoredObjects(Stored<Object>::load(*self.ref()))) @@ -544,7 +544,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  	for (auto & item : header.items) {  		switch (item.type) { -		case TransportHeader::Type::Acknowledged: +		case NetworkProtocol::Header::Type::Acknowledged:  			if (auto pref = std::get<PartialRef>(item.value)) {  				if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&  						std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest()) @@ -553,22 +553,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			}  			break; -		case TransportHeader::Type::DataRequest: { +		case NetworkProtocol::Header::Type::DataRequest: {  			auto pref = std::get<PartialRef>(item.value);  			if (holds_alternative<unique_ptr<Channel>>(peer.channel) ||  					plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {  				if (auto ref = peer.tempStorage.ref(pref.digest())) { -					TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; -					reply.header({ TransportHeader::Type::DataResponse, *ref }); +					NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref }; +					reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref });  					reply.body(*ref);  				}  			}  			break;  		} -		case TransportHeader::Type::DataResponse: +		case NetworkProtocol::Header::Type::DataResponse:  			if (auto pref = std::get<PartialRef>(item.value)) { -				reply.header({ TransportHeader::Type::Acknowledged, pref }); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });  				for (auto & pwref : waiting) {  					if (auto wref = pwref.lock()) {  						if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != @@ -583,13 +583,13 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			}  			break; -		case TransportHeader::Type::AnnounceSelf: { +		case NetworkProtocol::Header::Type::AnnounceSelf: {  			auto pref = std::get<PartialRef>(item.value);  			if (pref.digest() == self.ref()->digest())  				break;  			if (holds_alternative<monostate>(peer.identity)) { -				reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()}); +				reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()});  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage, @@ -604,10 +604,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			break;  		} -		case TransportHeader::Type::AnnounceUpdate: +		case NetworkProtocol::Header::Type::AnnounceUpdate:  			if (holds_alternative<Identity>(peer.identity)) {  				auto pref = std::get<PartialRef>(item.value); -				reply.header({ TransportHeader::Type::Acknowledged, pref }); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage, @@ -621,9 +621,9 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			}  			break; -		case TransportHeader::Type::ChannelRequest: +		case NetworkProtocol::Header::Type::ChannelRequest:  			if (auto pref = std::get<PartialRef>(item.value)) { -				reply.header({ TransportHeader::Type::Acknowledged, pref }); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });  				if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&  						std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest()) @@ -644,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			}  			break; -		case TransportHeader::Type::ChannelAccept: +		case NetworkProtocol::Header::Type::ChannelAccept:  			if (auto pref = std::get<PartialRef>(item.value)) {  				if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&  						std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest()) @@ -655,22 +655,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  					auto acc = ChannelAccept::load(*r);  					if (holds_alternative<Identity>(peer.identity) &&  							acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) { -						reply.header({ TransportHeader::Type::Acknowledged, pref }); +						reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });  						peer.finalizeChannel(reply, acc.data->channel());  					}  				}  			}  			break; -		case TransportHeader::Type::ServiceType: +		case NetworkProtocol::Header::Type::ServiceType:  			if (!serviceType)  				serviceType = std::get<UUID>(item.value);  			break; -		case TransportHeader::Type::ServiceRef: +		case NetworkProtocol::Header::Type::ServiceRef:  			if (!serviceType)  				for (auto & item : header.items) -					if (item.type == TransportHeader::Type::ServiceType) { +					if (item.type == NetworkProtocol::Header::Type::ServiceType) {  						serviceType = std::get<UUID>(item.value);  						break;  					} @@ -679,7 +679,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  			auto pref = std::get<PartialRef>(item.value);  			if (pref) -				reply.header({ TransportHeader::Type::Acknowledged, pref }); +				reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });  			shared_ptr<WaitingRef> wref(new WaitingRef {  				.storage = peer.tempStorage, @@ -703,15 +703,15 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)  		if (*id != self) {  			self = *id; -			vector<TransportHeader::Item> hitems; +			vector<NetworkProtocol::Header::Item> hitems;  			for (const auto & r : self.refs()) -				hitems.push_back(TransportHeader::Item { -					TransportHeader::Type::AnnounceUpdate, r }); +				hitems.push_back(NetworkProtocol::Header::Item { +					NetworkProtocol::Header::Type::AnnounceUpdate, r });  			for (const auto & r : self.updates()) -				hitems.push_back(TransportHeader::Item { -					TransportHeader::Type::AnnounceUpdate, r }); +				hitems.push_back(NetworkProtocol::Header::Item { +					NetworkProtocol::Header::Type::AnnounceUpdate, r }); -			TransportHeader header(hitems); +			NetworkProtocol::Header header(hitems);  			for (const auto & peer : peers)  				peer->send(header, { **self.ref() }, false); @@ -719,7 +719,7 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)  	}  } -void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs, bool secure) +void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure)  {  	vector<uint8_t> data, part, out; @@ -786,7 +786,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  		auto req = Channel::generateRequest(tempStorage,  				server.self, std::get<Identity>(identity));  		channel.emplace<Stored<ChannelRequest>>(req); -		reply.header({ TransportHeader::Type::ChannelRequest, req.ref() }); +		reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() });  		reply.body(req.ref());  		reply.body(req->data.ref());  		reply.body(req->data->key.ref()); @@ -801,7 +801,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  					req->isSignedBy(std::get<Identity>(identity).keyMessage())) {  				if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {  					channel.emplace<Stored<ChannelAccept>>(*acc); -					reply.header({ TransportHeader::Type::ChannelAccept, acc->ref() }); +					reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() });  					reply.body(acc->ref());  					reply.body(acc.value()->data.ref());  					reply.body(acc.value()->data->key.ref()); @@ -821,13 +821,13 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)  {  	channel.emplace<unique_ptr<Channel>>(move(ch)); -	vector<TransportHeader::Item> hitems; +	vector<NetworkProtocol::Header::Item> hitems;  	for (const auto & r : server.self.refs()) -		reply.header(TransportHeader::Item { -			TransportHeader::Type::AnnounceUpdate, r }); +		reply.header(NetworkProtocol::Header::Item { +			NetworkProtocol::Header::Type::AnnounceUpdate, r });  	for (const auto & r : server.self.updates()) -		reply.header(TransportHeader::Item { -			TransportHeader::Type::AnnounceUpdate, r }); +		reply.header(NetworkProtocol::Header::Item { +			NetworkProtocol::Header::Type::AnnounceUpdate, r });  }  void Server::Peer::updateService(ReplyBuilder & reply) @@ -881,7 +881,7 @@ void Server::Peer::trySendOutQueue()  } -void ReplyBuilder::header(TransportHeader::Item && item) +void ReplyBuilder::header(NetworkProtocol::Header::Item && item)  {  	for (const auto & x : mheader)  		if (x == item) @@ -926,146 +926,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply)  		return r;  	for (const auto & d : missing) -		reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); +		reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) });  	return nullopt;  } - - -bool TransportHeader::Item::operator==(const Item & other) const -{ -	if (type != other.type) -		return false; - -	if (value.index() != other.value.index()) -		return false; - -	if (holds_alternative<PartialRef>(value)) -		return std::get<PartialRef>(value).digest() == -			std::get<PartialRef>(other.value).digest(); - -	if (holds_alternative<UUID>(value)) -		return std::get<UUID>(value) == std::get<UUID>(other.value); - -	throw runtime_error("unhandled transport header item type"); -} - -optional<TransportHeader> TransportHeader::load(const PartialRef & ref) -{ -	return load(*ref); -} - -optional<TransportHeader> TransportHeader::load(const PartialObject & obj) -{ -	auto rec = obj.asRecord(); -	if (!rec) -		return nullopt; - -	vector<Item> items; -	for (const auto & item : rec->items()) { -		if (item.name == "ACK") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::Acknowledged, -					.value = *ref, -				}); -		} else if (item.name == "REQ") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::DataRequest, -					.value = *ref, -				}); -		} else if (item.name == "RSP") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::DataResponse, -					.value = *ref, -				}); -		} else if (item.name == "ANN") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::AnnounceSelf, -					.value = *ref, -				}); -		} else if (item.name == "ANU") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::AnnounceUpdate, -					.value = *ref, -				}); -		} else if (item.name == "CRQ") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::ChannelRequest, -					.value = *ref, -				}); -		} else if (item.name == "CAC") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::ChannelAccept, -					.value = *ref, -				}); -		} else if (item.name == "STP") { -			if (auto val = item.asUUID()) -				items.emplace_back(Item { -					.type = Type::ServiceType, -					.value = *val, -				}); -		} else if (item.name == "SRF") { -			if (auto ref = item.asRef()) -				items.emplace_back(Item { -					.type = Type::ServiceRef, -					.value = *ref, -				}); -		} -	} - -	return TransportHeader(items); -} - -PartialObject TransportHeader::toObject() const -{ -	vector<PartialRecord::Item> ritems; - -	for (const auto & item : items) { -		switch (item.type) { -		case Type::Acknowledged: -			ritems.emplace_back("ACK", std::get<PartialRef>(item.value)); -			break; - -		case Type::DataRequest: -			ritems.emplace_back("REQ", std::get<PartialRef>(item.value)); -			break; - -		case Type::DataResponse: -			ritems.emplace_back("RSP", std::get<PartialRef>(item.value)); -			break; - -		case Type::AnnounceSelf: -			ritems.emplace_back("ANN", std::get<PartialRef>(item.value)); -			break; - -		case Type::AnnounceUpdate: -			ritems.emplace_back("ANU", std::get<PartialRef>(item.value)); -			break; - -		case Type::ChannelRequest: -			ritems.emplace_back("CRQ", std::get<PartialRef>(item.value)); -			break; - -		case Type::ChannelAccept: -			ritems.emplace_back("CAC", std::get<PartialRef>(item.value)); -			break; - -		case Type::ServiceType: -			ritems.emplace_back("STP", std::get<UUID>(item.value)); -			break; - -		case Type::ServiceRef: -			ritems.emplace_back("SRF", std::get<PartialRef>(item.value)); -			break; -		} -	} - -	return PartialObject(PartialRecord(std::move(ritems))); -} diff --git a/src/network.h b/src/network.h index 74231bf..c3a2074 100644 --- a/src/network.h +++ b/src/network.h @@ -65,7 +65,7 @@ struct Server::Peer  	shared_ptr<erebos::Peer::Priv> lpeer = nullptr; -	void send(const struct TransportHeader &, const vector<Object> &, bool secure); +	void send(const NetworkProtocol::Header &, const vector<Object> &, bool secure);  	void updateIdentity(ReplyBuilder &);  	void updateChannel(ReplyBuilder &);  	void finalizeChannel(ReplyBuilder &, unique_ptr<Channel>); @@ -91,47 +91,17 @@ struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>  	void push(const shared_ptr<Server::Peer> &);  }; -struct TransportHeader -{ -	enum class Type { -		Acknowledged, -		DataRequest, -		DataResponse, -		AnnounceSelf, -		AnnounceUpdate, -		ChannelRequest, -		ChannelAccept, -		ServiceType, -		ServiceRef, -	}; - -	struct Item { -		const Type type; -		const variant<PartialRef, UUID> value; - -		bool operator==(const Item &) const; -		bool operator!=(const Item & other) const { return !(*this == other); } -	}; - -	TransportHeader(const vector<Item> & items): items(items) {} -	static optional<TransportHeader> load(const PartialRef &); -	static optional<TransportHeader> load(const PartialObject &); -	PartialObject toObject() const; - -	const vector<Item> items; -}; -  class ReplyBuilder  {  public: -	void header(TransportHeader::Item &&); +	void header(NetworkProtocol::Header::Item &&);  	void body(const Ref &); -	const vector<TransportHeader::Item> & header() const { return mheader; } +	const vector<NetworkProtocol::Header::Item> & header() const { return mheader; }  	vector<Object> body() const;  private: -	vector<TransportHeader::Item> mheader; +	vector<NetworkProtocol::Header::Item> mheader;  	vector<Ref> mbody;  }; @@ -160,7 +130,7 @@ struct Server::Priv  	Peer * findPeer(NetworkProtocol::Connection::Id cid) const;  	Peer & getPeer(const sockaddr_in6 & paddr);  	Peer & addPeer(NetworkProtocol::Connection conn); -	void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &); +	void handlePacket(Peer &, const NetworkProtocol::Header &, ReplyBuilder &);  	void handleLocalHeadChange(const Head<LocalState> &); @@ -181,7 +151,7 @@ struct Server::Priv  	vector<shared_ptr<Peer>> peers;  	PeerList plist; -	vector<struct TransportHeader> outgoing; +	vector<struct NetworkProtocol::Header> outgoing;  	vector<weak_ptr<WaitingRef>> waiting;  	NetworkProtocol protocol; diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index c247bf0..c2c6c5d 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -7,7 +7,10 @@  #include <mutex>  #include <system_error> +using std::holds_alternative;  using std::move; +using std::nullopt; +using std::runtime_error;  using std::scoped_lock;  namespace erebos { @@ -122,6 +125,10 @@ void NetworkProtocol::shutdown()  } +/******************************************************************************/ +/* Connection                                                                 */ +/******************************************************************************/ +  NetworkProtocol::Connection::Id NetworkProtocol::ConnectionPriv::id() const  {  	return reinterpret_cast<uintptr_t>(this); @@ -195,4 +202,147 @@ void NetworkProtocol::Connection::close()  	p = nullptr;  } + +/******************************************************************************/ +/* Header                                                                     */ +/******************************************************************************/ + +bool NetworkProtocol::Header::Item::operator==(const Item & other) const +{ +	if (type != other.type) +		return false; + +	if (value.index() != other.value.index()) +		return false; + +	if (holds_alternative<PartialRef>(value)) +		return std::get<PartialRef>(value).digest() == +			std::get<PartialRef>(other.value).digest(); + +	if (holds_alternative<UUID>(value)) +		return std::get<UUID>(value) == std::get<UUID>(other.value); + +	throw runtime_error("unhandled network header item type"); +} + +optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialRef & ref) +{ +	return load(*ref); +} + +optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObject & obj) +{ +	auto rec = obj.asRecord(); +	if (!rec) +		return nullopt; + +	vector<Item> items; +	for (const auto & item : rec->items()) { +		if (item.name == "ACK") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::Acknowledged, +					.value = *ref, +				}); +		} else if (item.name == "REQ") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::DataRequest, +					.value = *ref, +				}); +		} else if (item.name == "RSP") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::DataResponse, +					.value = *ref, +				}); +		} else if (item.name == "ANN") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::AnnounceSelf, +					.value = *ref, +				}); +		} else if (item.name == "ANU") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::AnnounceUpdate, +					.value = *ref, +				}); +		} else if (item.name == "CRQ") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::ChannelRequest, +					.value = *ref, +				}); +		} else if (item.name == "CAC") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::ChannelAccept, +					.value = *ref, +				}); +		} else if (item.name == "STP") { +			if (auto val = item.asUUID()) +				items.emplace_back(Item { +					.type = Type::ServiceType, +					.value = *val, +				}); +		} else if (item.name == "SRF") { +			if (auto ref = item.asRef()) +				items.emplace_back(Item { +					.type = Type::ServiceRef, +					.value = *ref, +				}); +		} +	} + +	return NetworkProtocol::Header(items); +} + +PartialObject NetworkProtocol::Header::toObject() const +{ +	vector<PartialRecord::Item> ritems; + +	for (const auto & item : items) { +		switch (item.type) { +		case Type::Acknowledged: +			ritems.emplace_back("ACK", std::get<PartialRef>(item.value)); +			break; + +		case Type::DataRequest: +			ritems.emplace_back("REQ", std::get<PartialRef>(item.value)); +			break; + +		case Type::DataResponse: +			ritems.emplace_back("RSP", std::get<PartialRef>(item.value)); +			break; + +		case Type::AnnounceSelf: +			ritems.emplace_back("ANN", std::get<PartialRef>(item.value)); +			break; + +		case Type::AnnounceUpdate: +			ritems.emplace_back("ANU", std::get<PartialRef>(item.value)); +			break; + +		case Type::ChannelRequest: +			ritems.emplace_back("CRQ", std::get<PartialRef>(item.value)); +			break; + +		case Type::ChannelAccept: +			ritems.emplace_back("CAC", std::get<PartialRef>(item.value)); +			break; + +		case Type::ServiceType: +			ritems.emplace_back("STP", std::get<UUID>(item.value)); +			break; + +		case Type::ServiceRef: +			ritems.emplace_back("SRF", std::get<PartialRef>(item.value)); +			break; +		} +	} + +	return PartialObject(PartialRecord(std::move(ritems))); +} +  } diff --git a/src/network/protocol.h b/src/network/protocol.h index a9bbaff..8aa22a2 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -1,5 +1,7 @@  #pragma once +#include <erebos/storage.h> +  #include <netinet/in.h>  #include <cstdint> @@ -7,10 +9,12 @@  #include <mutex>  #include <variant>  #include <vector> +#include <optional>  namespace erebos {  using std::mutex; +using std::optional;  using std::unique_ptr;  using std::variant;  using std::vector; @@ -28,6 +32,8 @@ public:  	class Connection; +	struct Header; +  	struct NewConnection;  	struct ConnectionReadReady;  	struct ProtocolClosed {}; @@ -85,4 +91,34 @@ private:  struct NetworkProtocol::NewConnection { Connection conn; };  struct NetworkProtocol::ConnectionReadReady { Connection::Id id; }; +struct NetworkProtocol::Header +{ +	enum class Type { +		Acknowledged, +		DataRequest, +		DataResponse, +		AnnounceSelf, +		AnnounceUpdate, +		ChannelRequest, +		ChannelAccept, +		ServiceType, +		ServiceRef, +	}; + +	struct Item { +		const Type type; +		const variant<PartialRef, UUID> value; + +		bool operator==(const Item &) const; +		bool operator!=(const Item & other) const { return !(*this == other); } +	}; + +	Header(const vector<Item> & items): items(items) {} +	static optional<Header> load(const PartialRef &); +	static optional<Header> load(const PartialObject &); +	PartialObject toObject() const; + +	const vector<Item> items; +}; +  } |