diff options
| -rw-r--r-- | include/erebos/storage.h | 43 | ||||
| -rw-r--r-- | src/network.cpp | 5 | ||||
| -rw-r--r-- | src/network.h | 2 | ||||
| -rw-r--r-- | src/storage.cpp | 98 | ||||
| -rw-r--r-- | src/storage.h | 22 | 
5 files changed, 141 insertions, 29 deletions
| diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 07e7a4f..10ced57 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -11,6 +11,7 @@  #include <future>  #include <memory>  #include <optional> +#include <stdexcept>  #include <string>  #include <variant>  #include <vector> @@ -99,6 +100,7 @@ public:  protected:  	template<typename T> friend class Head; +	template<typename T> friend class WatchedHead;  	Storage(const std::shared_ptr<const Priv> p): PartialStorage(p) {} @@ -107,7 +109,8 @@ protected:  	static UUID storeHead(UUID type, const Ref & ref);  	static bool replaceHead(UUID type, UUID id, const Ref & old, const Ref & ref);  	static std::optional<Ref> updateHead(UUID type, UUID id, const Ref & old, const std::function<Ref(const Ref &)> &); -	void watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const; +	int watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const; +	void unwatchHead(UUID type, UUID id, int watchId) const;  };  class Digest @@ -472,6 +475,8 @@ void filterAncestors(std::vector<Stored<T>> & xs)  	}  } +template<class T> class WatchedHead; +  template<class T>  class Head  { @@ -489,13 +494,34 @@ public:  	const Ref & ref() const { return mstored.ref(); }  	std::optional<Head<T>> update(const std::function<Stored<T>(const Stored<T> &)> &) const; -	void watch(const std::function<void(const Head<T> &)> &) const; +	WatchedHead<T> watch(const std::function<void(const Head<T> &)> &) const;  private:  	UUID mid;  	Stored<T> mstored;  }; +template<class T> +class WatchedHead : public Head<T> +{ +	friend class Head<T>; +	WatchedHead(const Head<T> & h, int watcherId): +		Head<T>(h), watcherId(watcherId) {} +	WatchedHead(WatchedHead<T> && h): +		Head<T>(h), watcherId(h.watcherId) +	{ h.watcherId = -1; } +	int watcherId; + +public: +	WatchedHead<T> & operator=(const Head<T> & h) { +		if (Head<T>::id() != h.id()) +			throw std::runtime_error("WatchedHead ID mismatch"); +		static_cast<Head<T> &>(*this) = h; +		return *this; +	} +	~WatchedHead(); +}; +  template<typename T>  std::optional<Head<T>> Storage::head(UUID id) const  { @@ -545,11 +571,20 @@ std::optional<Head<T>> Head<T>::update(const std::function<Stored<T>(const Store  }  template<typename T> -void Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const +WatchedHead<T> Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const  { -	stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) { +	int wid = stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) {  		watcher(Head<T>(id, ref));  	}); +	return WatchedHead<T>(*this, wid); +} + +template<class T> +WatchedHead<T>::~WatchedHead() +{ +	if (watcherId >= 0) +		Head<T>::stored().ref().storage().unwatchHead( +				T::headTypeId, Head<T>::id(), watcherId);  }  } diff --git a/src/network.cpp b/src/network.cpp index 74783b4..f33c097 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -201,8 +201,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)  Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,  		vector<unique_ptr<Service>> && svcs): -	localHead(local),  	self(self), +	// Watching needs to start after self is initialized +	localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))),  	services(std::move(svcs))  {  	struct ifaddrs * raddrs; @@ -238,8 +239,6 @@ Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,  	threadListen = thread([this] { doListen(); });  	threadAnnounce = thread([this] { doAnnounce(); }); - -	local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1));  }  Server::Priv::~Priv() diff --git a/src/network.h b/src/network.h index e22d453..6ebd60c 100644 --- a/src/network.h +++ b/src/network.h @@ -157,8 +157,8 @@ struct Server::Priv : enable_shared_from_this<Server::Priv>  	condition_variable announceCondvar;  	bool finish = false; -	Head<LocalState> localHead;  	Identity self; +	WatchedHead<LocalState> localHead;  	vector<unique_ptr<Service>> services;  	thread threadListen; diff --git a/src/storage.cpp b/src/storage.cpp index 607d71a..f50b471 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -253,10 +253,10 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons  	return true;  } -void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +int FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)  {  	scoped_lock lock(watcherLock); -	watchers.emplace(type, watcher); +	int wid = nextWatcherId++;  	if (inotify < 0) {  		inotify = inotify_init(); @@ -270,10 +270,43 @@ void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const  		watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this);  	} -	int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO); -	if (fd < 0) -		throw system_error(errno, std::generic_category()); -	watchMap[fd] = type; +	if (watchers.find(type) == watchers.end()) { +		int wd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO); +		if (wd < 0) +			throw system_error(errno, std::generic_category()); + +		watchMap[wd] = type; +	} +	watchers.emplace(type, tuple(wid, watcher)); + +	return wid; +} + +void FilesystemStorage::unwatchHead(UUID type, int wid) +{ +	scoped_lock lock(watcherLock); + +	if (inotify < 0) +		return; + +	auto range = watchers.equal_range(type); +	for (auto it = range.first; it != range.second; it++) { +		if (std::get<0>(it->second) == wid) { +			watchers.erase(it); +			break; +		} +	} + +	if (watchers.find(type) == watchers.end()) { +		for (auto it = watchMap.begin(); it != watchMap.end(); it++) { +			if (it->second == type) { +				if (inotify_rm_watch(inotify, it->first) < 0) +					throw system_error(errno, std::generic_category()); +				watchMap.erase(it); +				break; +			} +		} +	}  }  optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const @@ -333,13 +366,13 @@ void FilesystemStorage::inotifyWatch()  			event = (const struct inotify_event *) ptr;  			if (event->mask & IN_MOVED_TO) { +				scoped_lock lock(watcherLock);  				UUID type = watchMap[event->wd];  				if (auto mbid = UUID::fromString(event->name)) {  					if (auto mbref = headRef(type, *mbid)) { -						scoped_lock lock(watcherLock);  						auto range = watchers.equal_range(type);  						for (auto it = range.first; it != range.second; it++) -							it->second(*mbid, *mbref); +							std::get<1>(it->second)(*mbid, *mbref);  					}  				}  			} @@ -462,10 +495,25 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di  	return false;  } -void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +int MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)  {  	scoped_lock lock(watcherLock); -	watchers.emplace(type, watcher); +	int wid = nextWatcherId++; +	watchers.emplace(type, tuple(wid, watcher)); +	return wid; +} + +void MemoryStorage::unwatchHead(UUID type, int wid) +{ +	scoped_lock lock(watcherLock); + +	auto range = watchers.equal_range(type); +	for (auto it = range.first; it != range.second; it++) { +		if (std::get<0>(it->second) == wid) { +			watchers.erase(it); +			break; +		} +	}  }  optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const @@ -537,10 +585,25 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig  	return storage->replaceHead(type, id, old, dgst);  } -void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +int ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)  { -	parent->watchHead(type, watcher); -	storage->watchHead(type, watcher); +	scoped_lock lock(watcherLock); +	int wid = nextWatcherId++; + +	int id1 = parent->watchHead(type, watcher); +	int id2 = storage->watchHead(type, watcher); +	watchers.emplace(wid, tuple(id1, id2)); + +	return wid; +} + +void ChainStorage::unwatchHead(UUID type, int wid) +{ +	scoped_lock lock(watcherLock); + +	auto [id1, id2] = watchers.extract(wid).mapped(); +	parent->unwatchHead(type, id1); +	storage->unwatchHead(type, id2);  }  optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const @@ -767,15 +830,20 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std  		return nullopt;  } -void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const +int Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const  { -	p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { +	return p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) {  		if (id == wid)  			if (auto r = ref(dgst))  				watcher(*r);  	});  } +void Storage::unwatchHead(UUID type, UUID, int wid) const +{ +	p->backend->unwatchHead(type, wid); +} +  Digest::Digest(const string & str)  { diff --git a/src/storage.h b/src/storage.h index b8d769c..7ed8cf6 100644 --- a/src/storage.h +++ b/src/storage.h @@ -39,7 +39,8 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const = 0;  	virtual UUID storeHead(UUID type, const Digest & dgst) = 0;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0; -	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0; +	virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0; +	virtual void unwatchHead(UUID type, int watchId) = 0;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const = 0;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0; @@ -60,7 +61,8 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; -	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual void unwatchHead(UUID type, int watchId) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override; @@ -83,7 +85,8 @@ private:  	std::thread watcherThread;  	int inotify = -1;  	int inotifyWakeup = -1; -	unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers; +	int nextWatcherId = 1; +	unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;  	unordered_map<int, UUID> watchMap;  }; @@ -102,7 +105,8 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; -	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual void unwatchHead(UUID type, int watchId) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override; @@ -113,7 +117,8 @@ private:  	unordered_map<Digest, vector<uint8_t>> keys;  	mutex watcherLock; -	unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers; +	int nextWatcherId = 1; +	unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;  };  class ChainStorage : public StorageBackend @@ -134,7 +139,8 @@ public:  	virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;  	virtual UUID storeHead(UUID type, const Digest & dgst) override;  	virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; -	virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; +	virtual void unwatchHead(UUID type, int watchId) override;  	virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;  	virtual void storeKey(const Digest &, const vector<uint8_t> &) override; @@ -142,6 +148,10 @@ public:  private:  	shared_ptr<StorageBackend> storage;  	unique_ptr<ChainStorage> parent; + +	mutex watcherLock; +	int nextWatcherId = 1; +	unordered_map<int, tuple<int, int>> watchers;  };  struct PartialStorage::Priv |