diff options
| -rw-r--r-- | include/erebos/network.h | 1 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/network.cpp | 69 | ||||
| -rw-r--r-- | src/network.h | 32 | ||||
| -rw-r--r-- | src/network/channel.cpp (renamed from src/channel.cpp) | 0 | ||||
| -rw-r--r-- | src/network/channel.h (renamed from src/channel.h) | 2 | ||||
| -rw-r--r-- | src/network/protocol.cpp | 7 | ||||
| -rw-r--r-- | src/network/protocol.h | 35 | 
8 files changed, 72 insertions, 76 deletions
| diff --git a/include/erebos/network.h b/include/erebos/network.h index 6a3112a..2761a40 100644 --- a/include/erebos/network.h +++ b/include/erebos/network.h @@ -99,7 +99,6 @@ public:  	string addressStr() const;  	uint16_t port() const; -	bool hasChannel() const;  	bool send(UUID, const Ref &) const;  	bool send(UUID, const Object &) const; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 478fc50..fff6242 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,13 +4,13 @@ include_directories(  add_library(erebos  	attach.cpp -	channel.cpp  	contact.cpp  	frp.cpp  	identity.cpp  	merge.cpp  	message.cpp  	network.cpp +	network/channel.cpp  	network/protocol.cpp  	pairing.cpp  	pubkey.cpp diff --git a/src/network.cpp b/src/network.cpp index 725ac50..455496c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -207,13 +207,6 @@ void Peer::Priv::notifyWatchers()  	}  } -bool Peer::hasChannel() const -{ -	if (auto speer = p->speer.lock()) -		return holds_alternative<unique_ptr<Channel>>(speer->channel); -	return false; -} -  bool Peer::send(UUID uuid, const Ref & ref) const  {  	return send(uuid, ref, *ref); @@ -396,13 +389,13 @@ void Server::Priv::doListen()  			continue;  		current = &buf; -		if (holds_alternative<unique_ptr<Channel>>(peer->channel)) { -			if (auto dec = std::get<unique_ptr<Channel>>(peer->channel)->decrypt(buf)) { +		if (holds_alternative<unique_ptr<Channel>>(peer->connection.channel())) { +			if (auto dec = std::get<unique_ptr<Channel>>(peer->connection.channel())->decrypt(buf)) {  				decrypted = std::move(*dec);  				current = &decrypted;  			} -		} else if (holds_alternative<Stored<ChannelAccept>>(peer->channel)) { -			if (auto dec = std::get<Stored<ChannelAccept>>(peer->channel)-> +		} else if (holds_alternative<Stored<ChannelAccept>>(peer->connection.channel())) { +			if (auto dec = std::get<Stored<ChannelAccept>>(peer->connection.channel())->  					data->channel()->decrypt(buf)) {  				decrypted = std::move(*dec);  				current = &decrypted; @@ -508,7 +501,6 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr)  		.connection = protocol.connect(paddr),  		.identity = monostate(),  		.identityUpdates = {}, -		.channel = monostate(),  		.tempStorage = st,  		.partStorage = st.derivePartialStorage(),  		}); @@ -527,7 +519,6 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)  		.connection = move(conn),  		.identity = monostate(),  		.identityUpdates = {}, -		.channel = monostate(),  		.tempStorage = st,  		.partStorage = st.derivePartialStorage(),  		}); @@ -548,16 +539,16 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  		switch (item.type) {  		case NetworkProtocol::Header::Type::Acknowledged: {  			auto dgst = std::get<Digest>(item.value); -			if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && -					std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst) +			if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) && +					std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() == dgst)  				peer.finalizeChannel(reply, -					std::get<Stored<ChannelAccept>>(peer.channel)->data->channel()); +					std::get<Stored<ChannelAccept>>(peer.connection.channel())->data->channel());  			break;  		}  		case NetworkProtocol::Header::Type::DataRequest: {  			auto dgst = std::get<Digest>(item.value); -			if (holds_alternative<unique_ptr<Channel>>(peer.channel) || +			if (holds_alternative<unique_ptr<Channel>>(peer.connection.channel()) ||  					plaintextRefs.find(dgst) != plaintextRefs.end()) {  				if (auto ref = peer.tempStorage.ref(dgst)) {  					reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() }); @@ -595,7 +586,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage,  					.ref = peer.partStorage.ref(dgst), -					.peer = peer,  					.missing = {},  				});  				waiting.push_back(wref); @@ -613,7 +603,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  				shared_ptr<WaitingRef> wref(new WaitingRef {  					.storage = peer.tempStorage,  					.ref = peer.partStorage.ref(dgst), -					.peer = peer,  					.missing = {},  				});  				waiting.push_back(wref); @@ -626,29 +615,28 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  			auto dgst = std::get<Digest>(item.value);  			reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); -			if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && -					std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst) +			if (holds_alternative<Stored<ChannelRequest>>(peer.connection.channel()) && +					std::get<Stored<ChannelRequest>>(peer.connection.channel()).ref().digest() < dgst)  				break; -			if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) +			if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()))  				break;  			shared_ptr<WaitingRef> wref(new WaitingRef {  				.storage = peer.tempStorage,  				.ref = peer.partStorage.ref(dgst), -				.peer = peer,  				.missing = {},  			});  			waiting.push_back(wref); -			peer.channel = wref; +			peer.connection.channel() = wref;  			wref->check(reply);  			break;  		}  		case NetworkProtocol::Header::Type::ChannelAccept: {  			auto dgst = std::get<Digest>(item.value); -			if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && -					std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst) +			if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) && +					std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() < dgst)  				break;  			auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); @@ -685,9 +673,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head  			shared_ptr<WaitingRef> wref(new WaitingRef {  				.storage = peer.tempStorage, -					.ref = pref, -					.peer = peer, -					.missing = {}, +				.ref = pref, +				.missing = {},  			});  			waiting.push_back(wref);  			peer.serviceQueue.emplace_back(*serviceType, wref); @@ -732,8 +719,8 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj  		data.insert(data.end(), part.begin(), part.end());  	} -	if (holds_alternative<unique_ptr<Channel>>(channel)) -		out = std::get<unique_ptr<Channel>>(channel)->encrypt(data); +	if (holds_alternative<unique_ptr<Channel>>(connection.channel())) +		out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);  	else if (secure)  		secureOutQueue.emplace_back(move(data));  	else @@ -784,10 +771,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  	if (!holds_alternative<Identity>(identity))  		return; -	if (holds_alternative<monostate>(channel)) { +	if (holds_alternative<monostate>(connection.channel())) {  		auto req = Channel::generateRequest(tempStorage,  				server.self, std::get<Identity>(identity)); -		channel.emplace<Stored<ChannelRequest>>(req); +		connection.channel().emplace<Stored<ChannelRequest>>(req);  		reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() });  		reply.body(req.ref());  		reply.body(req->data.ref()); @@ -796,13 +783,13 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  			reply.body(sig.ref());  	} -	if (holds_alternative<shared_ptr<WaitingRef>>(channel)) { -		if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) { +	if (holds_alternative<shared_ptr<WaitingRef>>(connection.channel())) { +		if (auto ref = std::get<shared_ptr<WaitingRef>>(connection.channel())->check(reply)) {  			auto req = Stored<ChannelRequest>::load(*ref);  			if (holds_alternative<Identity>(identity) &&  					req->isSignedBy(std::get<Identity>(identity).keyMessage())) {  				if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) { -					channel.emplace<Stored<ChannelAccept>>(*acc); +					connection.channel().emplace<Stored<ChannelAccept>>(*acc);  					reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() });  					reply.body(acc->ref());  					reply.body(acc.value()->data.ref()); @@ -810,10 +797,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  					for (const auto & sig : acc.value()->sigs)  						reply.body(sig.ref());  				} else { -					channel = monostate(); +					connection.channel() = monostate();  				}  			} else { -				channel = monostate(); +				connection.channel() = monostate();  			}  		}  	} @@ -821,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)  void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)  { -	channel.emplace<unique_ptr<Channel>>(move(ch)); +	connection.channel().emplace<unique_ptr<Channel>>(move(ch));  	vector<NetworkProtocol::Header::Item> hitems;  	for (const auto & r : server.self.refs()) @@ -871,11 +858,11 @@ void Server::Peer::trySendOutQueue()  	if (secureOutQueue.empty())  		return; -	if (!holds_alternative<unique_ptr<Channel>>(channel)) +	if (!holds_alternative<unique_ptr<Channel>>(connection.channel()))  		return;  	for (const auto & data : secureOutQueue) { -		auto out = std::get<unique_ptr<Channel>>(channel)->encrypt(data); +		auto out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);  		connection.send(out);  	} diff --git a/src/network.h b/src/network.h index c3a2074..2959adc 100644 --- a/src/network.h +++ b/src/network.h @@ -2,7 +2,6 @@  #include <erebos/network.h> -#include "channel.h"  #include "network/protocol.h"  #include <condition_variable> @@ -51,12 +50,6 @@ struct Server::Peer  		Identity> identity;  	vector<shared_ptr<WaitingRef>> identityUpdates; -	variant<monostate, -		Stored<ChannelRequest>, -		shared_ptr<struct WaitingRef>, -		Stored<ChannelAccept>, -		unique_ptr<Channel>> channel; -  	Storage tempStorage;  	PartialStorage partStorage; @@ -91,31 +84,6 @@ struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>  	void push(const shared_ptr<Server::Peer> &);  }; -class ReplyBuilder -{ -public: -	void header(NetworkProtocol::Header::Item &&); -	void body(const Ref &); - -	const vector<NetworkProtocol::Header::Item> & header() const { return mheader; } -	vector<Object> body() const; - -private: -	vector<NetworkProtocol::Header::Item> mheader; -	vector<Ref> mbody; -}; - -struct WaitingRef -{ -	const Storage storage; -	const PartialRef ref; -	const Server::Peer & peer; -	vector<Digest> missing; - -	optional<Ref> check(); -	optional<Ref> check(ReplyBuilder &); -}; -  struct Server::Priv  {  	Priv(const Head<LocalState> & local, const Identity & self); diff --git a/src/channel.cpp b/src/network/channel.cpp index b317f3d..b317f3d 100644 --- a/src/channel.cpp +++ b/src/network/channel.cpp diff --git a/src/channel.h b/src/network/channel.h index 5f1786e..f932c84 100644 --- a/src/channel.h +++ b/src/network/channel.h @@ -2,7 +2,7 @@  #include <erebos/storage.h> -#include "identity.h" +#include "../identity.h"  #include <atomic>  #include <memory> diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index fb3a5ea..4151bf2 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -24,6 +24,8 @@ struct NetworkProtocol::ConnectionPriv  	mutex cmutex {};  	vector<uint8_t> buffer {}; + +	ChannelState channel = monostate();  }; @@ -202,6 +204,11 @@ void NetworkProtocol::Connection::close()  	p = nullptr;  } +NetworkProtocol::ChannelState & NetworkProtocol::Connection::channel() +{ +	return p->channel; +} +  /******************************************************************************/  /* Header                                                                     */ diff --git a/src/network/protocol.h b/src/network/protocol.h index 4794ba6..88abf67 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -1,5 +1,7 @@  #pragma once +#include "channel.h" +  #include <erebos/storage.h>  #include <netinet/in.h> @@ -45,6 +47,12 @@ public:  	PollResult poll(); +	using ChannelState = variant<monostate, +		Stored<ChannelRequest>, +		shared_ptr<struct WaitingRef>, +		Stored<ChannelAccept>, +		unique_ptr<Channel>>; +  	Connection connect(sockaddr_in6 addr);  	bool recvfrom(vector<uint8_t> & buffer, sockaddr_in6 & addr); @@ -84,6 +92,9 @@ public:  	void close(); +	// temporary: +	ChannelState & channel(); +  private:  	unique_ptr<ConnectionPriv> p;  }; @@ -121,4 +132,28 @@ struct NetworkProtocol::Header  	const vector<Item> items;  }; +class ReplyBuilder +{ +public: +	void header(NetworkProtocol::Header::Item &&); +	void body(const Ref &); + +	const vector<NetworkProtocol::Header::Item> & header() const { return mheader; } +	vector<Object> body() const; + +private: +	vector<NetworkProtocol::Header::Item> mheader; +	vector<Ref> mbody; +}; + +struct WaitingRef +{ +	const Storage storage; +	const PartialRef ref; +	vector<Digest> missing; + +	optional<Ref> check(); +	optional<Ref> check(ReplyBuilder &); +}; +  } |