From 5dc467310ddebeae8dcb6262f5499f37382711ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 17 Feb 2021 22:15:27 +0100 Subject: WatchedHead object allowing to stop watching Head --- include/erebos/storage.h | 43 +++++++++++++++++++-- src/network.cpp | 5 +-- src/network.h | 2 +- src/storage.cpp | 98 ++++++++++++++++++++++++++++++++++++++++-------- src/storage.h | 22 ++++++++--- 5 files changed, 141 insertions(+), 29 deletions(-) diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 07e7a4f..10ced57 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,7 @@ public: protected: template friend class Head; + template friend class WatchedHead; Storage(const std::shared_ptr p): PartialStorage(p) {} @@ -107,7 +109,8 @@ protected: static UUID storeHead(UUID type, const Ref & ref); static bool replaceHead(UUID type, UUID id, const Ref & old, const Ref & ref); static std::optional updateHead(UUID type, UUID id, const Ref & old, const std::function &); - void watchHead(UUID type, UUID id, const std::function) const; + int watchHead(UUID type, UUID id, const std::function) const; + void unwatchHead(UUID type, UUID id, int watchId) const; }; class Digest @@ -472,6 +475,8 @@ void filterAncestors(std::vector> & xs) } } +template class WatchedHead; + template class Head { @@ -489,13 +494,34 @@ public: const Ref & ref() const { return mstored.ref(); } std::optional> update(const std::function(const Stored &)> &) const; - void watch(const std::function &)> &) const; + WatchedHead watch(const std::function &)> &) const; private: UUID mid; Stored mstored; }; +template +class WatchedHead : public Head +{ + friend class Head; + WatchedHead(const Head & h, int watcherId): + Head(h), watcherId(watcherId) {} + WatchedHead(WatchedHead && h): + Head(h), watcherId(h.watcherId) + { h.watcherId = -1; } + int watcherId; + +public: + WatchedHead & operator=(const Head & h) { + if (Head::id() != h.id()) + throw std::runtime_error("WatchedHead ID mismatch"); + static_cast &>(*this) = h; + return *this; + } + ~WatchedHead(); +}; + template std::optional> Storage::head(UUID id) const { @@ -545,11 +571,20 @@ std::optional> Head::update(const std::function(const Store } template -void Head::watch(const std::function &)> & watcher) const +WatchedHead Head::watch(const std::function &)> & watcher) const { - stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) { + int wid = stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) { watcher(Head(id, ref)); }); + return WatchedHead(*this, wid); +} + +template +WatchedHead::~WatchedHead() +{ + if (watcherId >= 0) + Head::stored().ref().storage().unwatchHead( + T::headTypeId, Head::id(), watcherId); } } diff --git a/src/network.cpp b/src/network.cpp index 74783b4..f33c097 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -201,8 +201,9 @@ void PeerList::onUpdate(function w) Server::Priv::Priv(const Head & local, const Identity & self, vector> && svcs): - localHead(local), self(self), + // Watching needs to start after self is initialized + localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))), services(std::move(svcs)) { struct ifaddrs * raddrs; @@ -238,8 +239,6 @@ Server::Priv::Priv(const Head & local, const Identity & self, threadListen = thread([this] { doListen(); }); threadAnnounce = thread([this] { doAnnounce(); }); - - local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1)); } Server::Priv::~Priv() diff --git a/src/network.h b/src/network.h index e22d453..6ebd60c 100644 --- a/src/network.h +++ b/src/network.h @@ -157,8 +157,8 @@ struct Server::Priv : enable_shared_from_this condition_variable announceCondvar; bool finish = false; - Head localHead; Identity self; + WatchedHead localHead; vector> services; thread threadListen; diff --git a/src/storage.cpp b/src/storage.cpp index 607d71a..f50b471 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -253,10 +253,10 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons return true; } -void FilesystemStorage::watchHead(UUID type, const function & watcher) +int FilesystemStorage::watchHead(UUID type, const function & watcher) { scoped_lock lock(watcherLock); - watchers.emplace(type, watcher); + int wid = nextWatcherId++; if (inotify < 0) { inotify = inotify_init(); @@ -270,10 +270,43 @@ void FilesystemStorage::watchHead(UUID type, const function(it->second) == wid) { + watchers.erase(it); + break; + } + } + + if (watchers.find(type) == watchers.end()) { + for (auto it = watchMap.begin(); it != watchMap.end(); it++) { + if (it->second == type) { + if (inotify_rm_watch(inotify, it->first) < 0) + throw system_error(errno, std::generic_category()); + watchMap.erase(it); + break; + } + } + } } optional> FilesystemStorage::loadKey(const Digest & pubref) const @@ -333,13 +366,13 @@ void FilesystemStorage::inotifyWatch() event = (const struct inotify_event *) ptr; if (event->mask & IN_MOVED_TO) { + scoped_lock lock(watcherLock); UUID type = watchMap[event->wd]; if (auto mbid = UUID::fromString(event->name)) { if (auto mbref = headRef(type, *mbid)) { - scoped_lock lock(watcherLock); auto range = watchers.equal_range(type); for (auto it = range.first; it != range.second; it++) - it->second(*mbid, *mbref); + std::get<1>(it->second)(*mbid, *mbref); } } } @@ -462,10 +495,25 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di return false; } -void MemoryStorage::watchHead(UUID type, const function & watcher) +int MemoryStorage::watchHead(UUID type, const function & watcher) { scoped_lock lock(watcherLock); - watchers.emplace(type, watcher); + int wid = nextWatcherId++; + watchers.emplace(type, tuple(wid, watcher)); + return wid; +} + +void MemoryStorage::unwatchHead(UUID type, int wid) +{ + scoped_lock lock(watcherLock); + + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) { + if (std::get<0>(it->second) == wid) { + watchers.erase(it); + break; + } + } } optional> MemoryStorage::loadKey(const Digest & digest) const @@ -537,10 +585,25 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig return storage->replaceHead(type, id, old, dgst); } -void ChainStorage::watchHead(UUID type, const function & watcher) +int ChainStorage::watchHead(UUID type, const function & watcher) { - parent->watchHead(type, watcher); - storage->watchHead(type, watcher); + scoped_lock lock(watcherLock); + int wid = nextWatcherId++; + + int id1 = parent->watchHead(type, watcher); + int id2 = storage->watchHead(type, watcher); + watchers.emplace(wid, tuple(id1, id2)); + + return wid; +} + +void ChainStorage::unwatchHead(UUID type, int wid) +{ + scoped_lock lock(watcherLock); + + auto [id1, id2] = watchers.extract(wid).mapped(); + parent->unwatchHead(type, id1); + storage->unwatchHead(type, id2); } optional> ChainStorage::loadKey(const Digest & digest) const @@ -767,15 +830,20 @@ optional Storage::updateHead(UUID type, UUID id, const Ref & old, const std return nullopt; } -void Storage::watchHead(UUID type, UUID wid, const std::function watcher) const +int Storage::watchHead(UUID type, UUID wid, const std::function watcher) const { - p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { + return p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { if (id == wid) if (auto r = ref(dgst)) watcher(*r); }); } +void Storage::unwatchHead(UUID type, UUID, int wid) const +{ + p->backend->unwatchHead(type, wid); +} + Digest::Digest(const string & str) { diff --git a/src/storage.h b/src/storage.h index b8d769c..7ed8cf6 100644 --- a/src/storage.h +++ b/src/storage.h @@ -39,7 +39,8 @@ public: virtual vector> headRefs(UUID type) const = 0; virtual UUID storeHead(UUID type, const Digest & dgst) = 0; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0; - virtual void watchHead(UUID type, const function &) = 0; + virtual int watchHead(UUID type, const function &) = 0; + virtual void unwatchHead(UUID type, int watchId) = 0; virtual optional> loadKey(const Digest &) const = 0; virtual void storeKey(const Digest &, const vector &) = 0; @@ -60,7 +61,8 @@ public: virtual vector> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; - virtual void watchHead(UUID type, const function &) override; + virtual int watchHead(UUID type, const function &) override; + virtual void unwatchHead(UUID type, int watchId) override; virtual optional> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector &) override; @@ -83,7 +85,8 @@ private: std::thread watcherThread; int inotify = -1; int inotifyWakeup = -1; - unordered_multimap> watchers; + int nextWatcherId = 1; + unordered_multimap>> watchers; unordered_map watchMap; }; @@ -102,7 +105,8 @@ public: virtual vector> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; - virtual void watchHead(UUID type, const function &) override; + virtual int watchHead(UUID type, const function &) override; + virtual void unwatchHead(UUID type, int watchId) override; virtual optional> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector &) override; @@ -113,7 +117,8 @@ private: unordered_map> keys; mutex watcherLock; - unordered_multimap> watchers; + int nextWatcherId = 1; + unordered_multimap>> watchers; }; class ChainStorage : public StorageBackend @@ -134,7 +139,8 @@ public: virtual vector> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; - virtual void watchHead(UUID type, const function &) override; + virtual int watchHead(UUID type, const function &) override; + virtual void unwatchHead(UUID type, int watchId) override; virtual optional> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector &) override; @@ -142,6 +148,10 @@ public: private: shared_ptr storage; unique_ptr parent; + + mutex watcherLock; + int nextWatcherId = 1; + unordered_map> watchers; }; struct PartialStorage::Priv -- cgit v1.2.3