diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2021-01-17 21:59:46 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2021-01-19 21:44:22 +0100 | 
| commit | 2abd6593c8b047d3fd579aa6cc0058bbebe266f8 (patch) | |
| tree | 44af7148f648538846ef64bed0f8560f77de7e5d /src | |
| parent | 1466751256580d8b0e6eea46a8028dcab9742f6b (diff) | |
Server watching local state head
Diffstat (limited to 'src')
| -rw-r--r-- | src/network.cpp | 27 | ||||
| -rw-r--r-- | src/network.h | 6 | ||||
| -rw-r--r-- | src/storage.cpp | 128 | ||||
| -rw-r--r-- | src/storage.h | 24 | 
4 files changed, 176 insertions, 9 deletions
| diff --git a/src/network.cpp b/src/network.cpp index 4b79dcb..bd64e07 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -21,8 +21,8 @@ using std::unique_lock;  using namespace erebos; -Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs): -	p(new Priv(self, std::move(svcs))) +Server::Server(const Head<LocalState> & head, vector<unique_ptr<Service>> && svcs): +	p(new Priv(head, *head->identity(), std::move(svcs)))  {  } @@ -139,7 +139,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)  } -Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs): +Server::Priv::Priv(const Head<LocalState> & local, const Identity & self, +		vector<unique_ptr<Service>> && svcs): +	localHead(local),  	self(self),  	services(std::move(svcs))  { @@ -176,6 +178,8 @@ Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs):  	threadListen = thread([this] { doListen(); });  	threadAnnounce = thread([this] { doAnnounce(); }); + +	local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1));  }  Server::Priv::~Priv() @@ -447,6 +451,23 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea  	}  } +void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) +{ +	scoped_lock lock(dataMutex); +	if (auto id = head->identity()) { +		if (id->ref()->digest() != self.ref()->digest()) { +			self = *id; + +			TransportHeader header({ +				{ TransportHeader::Type::AnnounceSelf, *self.ref() } +			}); + +			for (const auto & peer : peers) +				peer->send(header, { **self.ref() }); +		} +	} +} +  void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs) const  {  	vector<uint8_t> data, part, out; diff --git a/src/network.h b/src/network.h index 9b146a9..c02dbc3 100644 --- a/src/network.h +++ b/src/network.h @@ -136,7 +136,8 @@ struct WaitingRef  struct Server::Priv  { -	Priv(const Identity & self, vector<unique_ptr<Service>> && svcs); +	Priv(const Head<LocalState> & local, const Identity & self, +			vector<unique_ptr<Service>> && svcs);  	~Priv();  	void doListen();  	void doAnnounce(); @@ -144,6 +145,8 @@ struct Server::Priv  	Peer & getPeer(const sockaddr_in & paddr);  	void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &); +	void handleLocalHeadChange(const Head<LocalState> &); +  	constexpr static uint16_t discoveryPort { 29665 };  	constexpr static chrono::seconds announceInterval { 60 }; @@ -151,6 +154,7 @@ struct Server::Priv  	condition_variable announceCondvar;  	bool finish = false; +	Head<LocalState> localHead;  	Identity self;  	vector<unique_ptr<Service>> services; diff --git a/src/storage.cpp b/src/storage.cpp index 45caadb..4f48c67 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -9,7 +9,10 @@  #include <stdexcept>  #include <thread> +#include <poll.h>  #include <stdio.h> +#include <sys/eventfd.h> +#include <sys/inotify.h>  #include <blake2.h>  #include <zlib.h> @@ -28,8 +31,10 @@ using std::monostate;  using std::nullopt;  using std::ofstream;  using std::runtime_error; +using std::scoped_lock;  using std::shared_ptr;  using std::string; +using std::system_error;  using std::to_string;  FilesystemStorage::FilesystemStorage(const fs::path & path): @@ -45,6 +50,24 @@ FilesystemStorage::FilesystemStorage(const fs::path & path):  		fs::create_directory(path/"heads");  } +FilesystemStorage::~FilesystemStorage() +{ +	if (inotifyWakeup >= 0) { +		uint64_t x = 1; +		write(inotifyWakeup, &x, sizeof(x)); +	} + +	if (watcherThread.joinable()) +		watcherThread.join(); + +	if (inotify >= 0) +		close(inotify); + +	if (inotifyWakeup >= 0) +		close(inotifyWakeup); + +} +  bool FilesystemStorage::contains(const Digest & digest) const  {  	return fs::exists(objectPath(digest)); @@ -237,6 +260,29 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons  	return true;  } +void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ +	scoped_lock lock(watcherLock); +	watchers.emplace(type, watcher); + +	if (inotify < 0) { +		inotify = inotify_init(); +		if (inotify < 0) +			throw system_error(errno, std::generic_category()); + +		inotifyWakeup = eventfd(0, 0); +		if (inotifyWakeup < 0) +			throw system_error(errno, std::generic_category()); + +		watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this); +	} + +	int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO); +	if (fd < 0) +		throw system_error(errno, std::generic_category()); +	watchMap[fd] = type; +} +  optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const  {  	fs::path path = keyPath(pubref); @@ -259,6 +305,55 @@ void FilesystemStorage::storeKey(const Digest & pubref, const vector<uint8_t> &  	file.write((const char *) key.data(), key.size());  } +void FilesystemStorage::inotifyWatch() +{ +	char buf[4096] +		__attribute__ ((aligned(__alignof__(struct inotify_event)))); +	const struct inotify_event * event; + +	array pfds { +		pollfd { inotify, POLLIN, 0 }, +		pollfd { inotifyWakeup, POLLIN, 0 }, +	}; + +	while (true) { +		int ret = poll(pfds.data(), pfds.size(), -1); +		if (ret < 0) +			throw system_error(errno, std::generic_category()); + +		if (!(pfds[0].revents & POLLIN)) +			break; + +		ssize_t len = read(inotify, buf, sizeof buf); +		if (len < 0) { +			if (errno == EAGAIN) +				continue; + +			throw system_error(errno, std::generic_category()); +		} + +		if (len == 0) +			break; + +		for (char * ptr = buf; ptr < buf + len; +				ptr += sizeof(struct inotify_event) + event->len) { +			event = (const struct inotify_event *) ptr; + +			if (event->mask & IN_MOVED_TO) { +				UUID type = watchMap[event->wd]; +				if (auto mbid = UUID::fromString(event->name)) { +					if (auto mbref = headRef(type, *mbid)) { +						scoped_lock lock(watcherLock); +						auto range = watchers.equal_range(type); +						for (auto it = range.first; it != range.second; it++) +							it->second(*mbid, *mbref); +					} +				} +			} +		} +	} +} +  fs::path FilesystemStorage::objectPath(const Digest & digest) const  {  	string name(digest); @@ -267,12 +362,16 @@ fs::path FilesystemStorage::objectPath(const Digest & digest) const  		fs::path(name.begin() + 2, name.end());  } +fs::path FilesystemStorage::headPath(UUID type) const +{ +	string stype(type); +	return root/"heads"/fs::path(stype.begin(), stype.end()); +} +  fs::path FilesystemStorage::headPath(UUID type, UUID id) const  { -	string stype(type), sid(id); -	return root/"heads"/ -		fs::path(stype.begin(), stype.end())/ -		fs::path(sid.begin(), sid.end()); +	string sid(id); +	return headPath(type) / fs::path(sid.begin(), sid.end());  }  fs::path FilesystemStorage::keyPath(const Digest & digest) const @@ -367,6 +466,12 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di  	return false;  } +void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ +	scoped_lock lock(watcherLock); +	watchers.emplace(type, watcher); +} +  optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const  {  	auto it = keys.find(digest); @@ -436,6 +541,12 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig  	return storage->replaceHead(type, id, old, dgst);  } +void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ +	parent->watchHead(type, watcher); +	storage->watchHead(type, watcher); +} +  optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const  {  	if (auto res = storage->loadKey(digest)) @@ -669,6 +780,15 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std  		return nullopt;  } +void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const +{ +	p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { +		if (id == wid) +			if (auto r = ref(dgst)) +				watcher(*r); +	}); +} +  Digest::Digest(const string & str)  { diff --git a/src/storage.h b/src/storage.h index 18ac1ad..b8d769c 100644 --- a/src/storage.h +++ b/src/storage.h @@ -2,16 +2,21 @@  #include "erebos/storage.h" +#include <functional> +#include <mutex>  #include <unordered_map>  #include <unordered_set>  namespace fs = std::filesystem; +using std::function; +using std::mutex;  using std::optional;  using std::shared_future;  using std::shared_ptr;  using std::unique_ptr;  using std::unordered_map; +using std::unordered_multimap;  using std::unordered_set;  using std::tuple;  using std::variant; @@ -34,6 +39,7 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const = 0;  	virtual UUID storeHead(UUID type, const Digest & dgst) = 0;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0; +	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const = 0;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0; @@ -43,7 +49,7 @@ class FilesystemStorage : public StorageBackend  {  public:  	FilesystemStorage(const fs::path &); -	virtual ~FilesystemStorage() = default; +	virtual ~FilesystemStorage();  	virtual bool contains(const Digest &) const override; @@ -54,20 +60,31 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; +	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override;  private: +	void inotifyWatch(); +  	static constexpr size_t CHUNK = 16384;  	fs::path objectPath(const Digest &) const; +	fs::path headPath(UUID id) const;  	fs::path headPath(UUID id, UUID type) const;  	fs::path keyPath(const Digest &) const;  	FILE * openLockFile(const fs::path & path) const;  	fs::path root; + +	mutex watcherLock; +	std::thread watcherThread; +	int inotify = -1; +	int inotifyWakeup = -1; +	unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers; +	unordered_map<int, UUID> watchMap;  };  class MemoryStorage : public StorageBackend @@ -85,6 +102,7 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; +	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override; @@ -93,6 +111,9 @@ private:  	unordered_map<Digest, vector<uint8_t>> storage;  	unordered_map<UUID, vector<tuple<UUID, Digest>>> heads;  	unordered_map<Digest, vector<uint8_t>> keys; + +	mutex watcherLock; +	unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers;  };  class ChainStorage : public StorageBackend @@ -113,6 +134,7 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; +	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override; |