diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/network.cpp | 83 | ||||
| -rw-r--r-- | src/network.h | 24 | ||||
| -rw-r--r-- | src/storage.cpp | 34 | ||||
| -rw-r--r-- | src/storage.h | 1 | 
4 files changed, 126 insertions, 16 deletions
| diff --git a/src/network.cpp b/src/network.cpp index 64f9ed3..bd1ea8e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,12 +2,14 @@  #include "identity.h" +#include <algorithm>  #include <cstring>  #include <ifaddrs.h>  #include <net/if.h>  #include <unistd.h> +using std::holds_alternative;  using std::scoped_lock;  using std::unique_lock; @@ -88,12 +90,19 @@ void Server::Priv::doListen()  		if (ret < 0)  			throw std::system_error(errno, std::generic_category()); -		auto peer = getPeer(paddr); +		auto & peer = getPeer(paddr);  		if (auto dec = PartialObject::decodePrefix(peer.partStorage,  				buf.begin(), buf.begin() + ret)) {  			if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { +				auto pos = std::get<1>(*dec); +				while (auto cdec = PartialObject::decodePrefix(peer.partStorage, +							pos, buf.begin() + ret)) { +					peer.partStorage.storeObject(std::get<PartialObject>(*cdec)); +					pos = std::get<1>(*cdec); +				}  				scoped_lock<mutex> hlock(dataMutex);  				handlePacket(peer, *header); +				peer.updateIdentity();  			}  		} @@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr)  	Peer * peer = new Peer {  		.sock = sock,  		.addr = paddr, +		.identity = monostate(),  		.tempStorage = st,  		.partStorage = st.derivePartialStorage(),  		}; @@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)  	for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))  		plaintextRefs.insert(obj.ref.digest()); +	vector<TransportHeader::Item> replyHeaders; +	vector<Object> replyBody; +  	for (auto & item : header.items) {  		switch (item.type) {  		case TransportHeader::Type::Acknowledged: @@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)  			if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {  				if (auto ref = peer.tempStorage.ref(pref.digest())) {  					TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; -					peer.send(TransportHeader({ hitem }), { **ref }); +					replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref }); +					replyBody.push_back(**ref);  				}  			}  			break;  		}  		case TransportHeader::Type::DataResponse: +			if (auto pref = std::get<PartialRef>(item.value)) { +				replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref }); +				for (auto & pwref : waiting) { +					if (auto wref = pwref.lock()) { +						if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != +								wref->missing.end()) { +							if (wref->check(&replyHeaders)) +								pwref.reset(); +						} +					} +				} +				waiting.erase(std::remove_if(waiting.begin(), waiting.end(), +							[](auto & wref) { return wref.expired(); }), waiting.end()); +			}  			break; -		case TransportHeader::Type::AnnounceSelf: +		case TransportHeader::Type::AnnounceSelf: { +			auto pref = std::get<PartialRef>(item.value); +			if (pref.digest() == self.ref()->digest()) +				break; + +			if (holds_alternative<monostate>(peer.identity)) +				replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + +			shared_ptr<WaitingRef> wref(new WaitingRef { +				.storage = peer.tempStorage, +				.ref = pref, +				.peer = peer, +				.missing = {}, +			}); +			waiting.push_back(wref); +			peer.identity = wref; +			wref->check(&replyHeaders);  			break; +		}  		case TransportHeader::Type::AnnounceUpdate:  			break; @@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)  		}  	} + +	if (!replyHeaders.empty()) +		peer.send(TransportHeader(replyHeaders), replyBody);  } -void Peer::send(const TransportHeader & header, const vector<Object> & objs) +void Peer::send(const TransportHeader & header, const vector<Object> & objs) const  {  	vector<uint8_t> data, part; @@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector<Object> & objs)  			(sockaddr *) &addr, sizeof(addr));  } +void Peer::updateIdentity() +{ +	if (holds_alternative<shared_ptr<WaitingRef>>(identity)) +		if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check()) +			if (auto id = Identity::load(*ref)) +				identity.emplace<Identity>(*id); +} + + +optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request) +{ +	if (auto r = storage.ref(ref.digest())) +		return *r; + +	auto res = storage.copy(ref); +	if (auto r = std::get_if<Ref>(&res)) +		return *r; + +	missing = std::get<vector<Digest>>(res); +	if (request) +		for (const auto & d : missing) +			request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + +	return nullopt; +} + +  optional<TransportHeader> TransportHeader::load(const PartialRef & ref)  {  	return load(*ref); diff --git a/src/network.h b/src/network.h index af202f6..bb32323 100644 --- a/src/network.h +++ b/src/network.h @@ -10,13 +10,16 @@  #include <netinet/in.h>  using std::condition_variable; +using std::monostate;  using std::mutex;  using std::optional; +using std::shared_ptr;  using std::string;  using std::thread;  using std::unique_ptr;  using std::variant;  using std::vector; +using std::weak_ptr;  namespace chrono = std::chrono;  using chrono::steady_clock; @@ -25,13 +28,21 @@ namespace erebos {  struct Peer  { +	Peer(const Peer &) = delete; +	Peer & operator=(const Peer &) = delete; +  	const int sock;  	const sockaddr_in addr; +	variant<monostate, +		shared_ptr<struct WaitingRef>, +		Identity> identity; +  	Storage tempStorage;  	PartialStorage partStorage; -	void send(const struct TransportHeader &, const vector<Object> &); +	void send(const struct TransportHeader &, const vector<Object> &) const; +	void updateIdentity();  };  struct TransportHeader @@ -61,6 +72,16 @@ struct TransportHeader  	const vector<Item> items;  }; +struct WaitingRef +{ +	const Storage storage; +	const PartialRef ref; +	const Peer & peer; +	vector<Digest> missing; + +	optional<Ref> check(vector<TransportHeader::Item> * request = nullptr); +}; +  struct Server::Priv  {  	Priv(const Identity & self); @@ -84,6 +105,7 @@ struct Server::Priv  	vector<unique_ptr<Peer>> peers;  	vector<struct TransportHeader> outgoing; +	vector<weak_ptr<WaitingRef>> waiting;  	int sock;  	vector<in_addr> bcastAddresses; diff --git a/src/storage.cpp b/src/storage.cpp index 608f82b..6b2e4f8 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -316,6 +316,19 @@ optional<Ref> Storage::ref(const Digest & digest) const  	return Ref::create(*this, digest);  } +Digest PartialStorage::Priv::storeBytes(const vector<uint8_t> & content) const +{ +	array<uint8_t, Digest::size> arr; +	int ret = blake2b(arr.data(), content.data(), nullptr, +			Digest::size, content.size(), 0); +	if (ret != 0) +		throw runtime_error("failed to compute digest"); + +	Digest digest(arr); +	backend->storeBytes(digest, content); +	return digest; +} +  optional<vector<uint8_t>> PartialStorage::Priv::loadBytes(const Digest & digest) const  {  	auto ocontent = backend->loadBytes(digest); @@ -339,6 +352,15 @@ optional<PartialObject> PartialStorage::loadObject(const Digest & digest) const  	return nullopt;  } +PartialRef PartialStorage::storeObject(const PartialObject & obj) const +{ return ref(p->storeBytes(obj.encode())); } + +PartialRef PartialStorage::storeObject(const PartialRecord & val) const +{ return storeObject(PartialObject(val)); } + +PartialRef PartialStorage::storeObject(const Blob & val) const +{ return storeObject(PartialObject(val)); } +  optional<Object> Storage::loadObject(const Digest & digest) const  {  	if (auto content = p->loadBytes(digest)) @@ -380,17 +402,7 @@ optional<Digest> Storage::Priv::copy(const ObjectT<S> & pobj, vector<Digest> * m  	if (fail)  		return nullopt; -	auto content = pobj.encode(); - -	array<uint8_t, Digest::size> arr; -	int ret = blake2b(arr.data(), content.data(), nullptr, -			Digest::size, content.size(), 0); -	if (ret != 0) -		throw runtime_error("failed to compute digest"); - -	Digest digest(arr); -	backend->storeBytes(digest, content); -	return digest; +	return storeBytes(pobj.encode());  }  variant<Ref, vector<Digest>> Storage::copy(const PartialRef & pref) const diff --git a/src/storage.h b/src/storage.h index 9e22a4a..86dc48f 100644 --- a/src/storage.h +++ b/src/storage.h @@ -102,6 +102,7 @@ struct Storage::Priv  {  	shared_ptr<StorageBackend> backend; +	Digest storeBytes(const vector<uint8_t> &) const;  	optional<vector<uint8_t>> loadBytes(const Digest & digest) const;  	template<class S> |