From be323e65108f4c1d57312a4d26a6a24d3a380c75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 28 May 2023 20:48:07 +0200 Subject: Storage: wait for scheduled watch callbacks --- include/erebos/storage.h | 15 ++++++++ src/storage.cpp | 99 ++++++++++++++++++++++++++++++++---------------- src/storage.h | 22 ++++++++++- 3 files changed, 102 insertions(+), 34 deletions(-) diff --git a/include/erebos/storage.h b/include/erebos/storage.h index ed537eb..3a3073d 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -563,6 +563,9 @@ private: Stored mstored; }; +/** + * Manages registered watch callbacks to Head object using RAII principle. + */ template class WatchedHead : public Head { @@ -590,6 +593,18 @@ public: static_cast &>(*this) = h; return *this; } + + /// Destructor stops the watching started with Head::watch call. + /** + * Once the WatchedHead object is destroyed, no further Head changes + * will trigger the associated callback. + * + * The destructor also ensures that any scheduled callback run + * triggered by a previous change to the head is executed and finished + * before the destructor returns. The exception is when the destructor + * is called directly from the callback itself, in which case the + * destructor returns immediately. + */ ~WatchedHead(); }; diff --git a/src/storage.cpp b/src/storage.cpp index b5688bb..fb3698c 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -39,6 +39,23 @@ using std::system_error; using std::to_string; using std::weak_ptr; +void StorageWatchCallback::schedule(UUID uuid, const Digest & dgst) +{ + scoped_lock lock(runMutex); + scheduled.emplace(uuid, dgst); +} + +void StorageWatchCallback::run() +{ + scoped_lock lock(runMutex); + if (scheduled) { + auto [uuid, dgst] = *scheduled; + scheduled.reset(); // avoid running the callback twice + + callback(uuid, dgst); + } +} + FilesystemStorage::FilesystemStorage(const fs::path & path): root(path) { @@ -278,36 +295,45 @@ int FilesystemStorage::watchHead(UUID type, const function(wid, watcher)); return wid; } void FilesystemStorage::unwatchHead(UUID type, int wid) { - scoped_lock lock(watcherLock); + shared_ptr cb; - if (inotify < 0) - return; + { + scoped_lock lock(watcherLock); - auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) { - if (std::get<0>(it->second) == wid) { - watchers.erase(it); - break; - } - } + if (inotify < 0) + return; - if (watchers.find(type) == watchers.end()) { - for (auto it = watchMap.begin(); it != watchMap.end(); it++) { - if (it->second == type) { - if (inotify_rm_watch(inotify, it->first) < 0) - throw system_error(errno, std::generic_category()); - watchMap.erase(it); + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) { + if (it->second->id == wid) { + cb = move(it->second); + watchers.erase(it); break; } } + + if (watchers.find(type) == watchers.end()) { + for (auto it = watchMap.begin(); it != watchMap.end(); it++) { + if (it->second == type) { + if (inotify_rm_watch(inotify, it->first) < 0) + throw system_error(errno, std::generic_category()); + watchMap.erase(it); + break; + } + } + } } + + // Run the last callback if scheduled and not yet executed + if (cb) + cb->run(); } optional> FilesystemStorage::loadKey(const Digest & pubref) const @@ -367,9 +393,7 @@ void FilesystemStorage::inotifyWatch() event = (const struct inotify_event *) ptr; if (event->mask & IN_MOVED_TO) { - vector> callbacks; - optional mbid; - optional mbref; + vector> callbacks; { // Copy relevant callbacks to temporary array, so they @@ -377,17 +401,19 @@ void FilesystemStorage::inotifyWatch() scoped_lock lock(watcherLock); UUID type = watchMap[event->wd]; - if ((mbid = UUID::fromString(event->name))) { - if ((mbref = headRef(type, *mbid))) { + if (auto mbid = UUID::fromString(event->name)) { + if (auto mbref = headRef(type, *mbid)) { auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) - callbacks.push_back(std::get<1>(it->second)); + for (auto it = range.first; it != range.second; it++) { + it->second->schedule(*mbid, *mbref); + callbacks.push_back(it->second); + } } } } for (const auto & cb : callbacks) - cb(*mbid, *mbref); + cb->run(); } } } @@ -512,21 +538,30 @@ int MemoryStorage::watchHead(UUID type, const function(wid, watcher)); return wid; } void MemoryStorage::unwatchHead(UUID type, int wid) { - scoped_lock lock(watcherLock); + shared_ptr cb; - auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) { - if (std::get<0>(it->second) == wid) { - watchers.erase(it); - break; + { + scoped_lock lock(watcherLock); + + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) { + if (it->second->id == wid) { + cb = move(it->second); + watchers.erase(it); + break; + } } } + + // Run the last callback if scheduled and not yet executed + if (cb) + cb->run(); } optional> MemoryStorage::loadKey(const Digest & digest) const diff --git a/src/storage.h b/src/storage.h index 30e4213..c6b5ed2 100644 --- a/src/storage.h +++ b/src/storage.h @@ -45,6 +45,24 @@ public: virtual void storeKey(const Digest &, const vector &) = 0; }; +class StorageWatchCallback +{ +public: + StorageWatchCallback(int id, const function callback): + id(id), callback(callback) {} + + void schedule(UUID, const Digest &); + void run(); + + const int id; + +private: + const function callback; + + std::recursive_mutex runMutex; + optional> scheduled; +}; + class FilesystemStorage : public StorageBackend { public: @@ -85,7 +103,7 @@ private: int inotify = -1; int inotifyWakeup = -1; int nextWatcherId = 1; - unordered_multimap>> watchers; + unordered_multimap> watchers; unordered_map watchMap; }; @@ -117,7 +135,7 @@ private: mutex watcherLock; int nextWatcherId = 1; - unordered_multimap>> watchers; + unordered_multimap> watchers; }; class ChainStorage : public StorageBackend -- cgit v1.2.3