diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2023-05-28 20:48:07 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-06-14 22:17:17 +0200 |
commit | be323e65108f4c1d57312a4d26a6a24d3a380c75 (patch) | |
tree | 2ddc81ef079f3d42fb4d4780a5fd1cf0d27ef7b5 | |
parent | 15ad6ae7bd64d8d7319d75dbbb0827addd22fef2 (diff) |
Storage: wait for scheduled watch callbacks
-rw-r--r-- | include/erebos/storage.h | 15 | ||||
-rw-r--r-- | src/storage.cpp | 99 | ||||
-rw-r--r-- | src/storage.h | 22 |
3 files changed, 102 insertions, 34 deletions
diff --git a/include/erebos/storage.h b/include/erebos/storage.h index ed537eb..3a3073d 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -563,6 +563,9 @@ private: Stored<T> mstored; }; +/** + * Manages registered watch callbacks to Head<T> object using RAII principle. + */ template<class T> class WatchedHead : public Head<T> { @@ -590,6 +593,18 @@ public: static_cast<Head<T> &>(*this) = h; return *this; } + + /// Destructor stops the watching started with Head<T>::watch call. + /** + * Once the WatchedHead object is destroyed, no further Head<T> changes + * will trigger the associated callback. + * + * The destructor also ensures that any scheduled callback run + * triggered by a previous change to the head is executed and finished + * before the destructor returns. The exception is when the destructor + * is called directly from the callback itself, in which case the + * destructor returns immediately. + */ ~WatchedHead(); }; diff --git a/src/storage.cpp b/src/storage.cpp index b5688bb..fb3698c 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -39,6 +39,23 @@ using std::system_error; using std::to_string; using std::weak_ptr; +void StorageWatchCallback::schedule(UUID uuid, const Digest & dgst) +{ + scoped_lock lock(runMutex); + scheduled.emplace(uuid, dgst); +} + +void StorageWatchCallback::run() +{ + scoped_lock lock(runMutex); + if (scheduled) { + auto [uuid, dgst] = *scheduled; + scheduled.reset(); // avoid running the callback twice + + callback(uuid, dgst); + } +} + FilesystemStorage::FilesystemStorage(const fs::path & path): root(path) { @@ -278,36 +295,45 @@ int FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const D watchMap[wd] = type; } - watchers.emplace(type, tuple(wid, watcher)); + watchers.emplace(type, make_shared<StorageWatchCallback>(wid, watcher)); return wid; } void FilesystemStorage::unwatchHead(UUID type, int wid) { - scoped_lock lock(watcherLock); + shared_ptr<StorageWatchCallback> cb; - if (inotify < 0) - return; + { + scoped_lock lock(watcherLock); - auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) { - if (std::get<0>(it->second) == wid) { - watchers.erase(it); - break; - } - } + if (inotify < 0) + return; - if (watchers.find(type) == watchers.end()) { - for (auto it = watchMap.begin(); it != watchMap.end(); it++) { - if (it->second == type) { - if (inotify_rm_watch(inotify, it->first) < 0) - throw system_error(errno, std::generic_category()); - watchMap.erase(it); + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) { + if (it->second->id == wid) { + cb = move(it->second); + watchers.erase(it); break; } } + + if (watchers.find(type) == watchers.end()) { + for (auto it = watchMap.begin(); it != watchMap.end(); it++) { + if (it->second == type) { + if (inotify_rm_watch(inotify, it->first) < 0) + throw system_error(errno, std::generic_category()); + watchMap.erase(it); + break; + } + } + } } + + // Run the last callback if scheduled and not yet executed + if (cb) + cb->run(); } optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const @@ -367,9 +393,7 @@ void FilesystemStorage::inotifyWatch() event = (const struct inotify_event *) ptr; if (event->mask & IN_MOVED_TO) { - vector<function<void(UUID id, const Digest &)>> callbacks; - optional<UUID> mbid; - optional<Digest> mbref; + vector<shared_ptr<StorageWatchCallback>> callbacks; { // Copy relevant callbacks to temporary array, so they @@ -377,17 +401,19 @@ void FilesystemStorage::inotifyWatch() scoped_lock lock(watcherLock); UUID type = watchMap[event->wd]; - if ((mbid = UUID::fromString(event->name))) { - if ((mbref = headRef(type, *mbid))) { + if (auto mbid = UUID::fromString(event->name)) { + if (auto mbref = headRef(type, *mbid)) { auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) - callbacks.push_back(std::get<1>(it->second)); + for (auto it = range.first; it != range.second; it++) { + it->second->schedule(*mbid, *mbref); + callbacks.push_back(it->second); + } } } } for (const auto & cb : callbacks) - cb(*mbid, *mbref); + cb->run(); } } } @@ -512,21 +538,30 @@ int MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Diges { scoped_lock lock(watcherLock); int wid = nextWatcherId++; - watchers.emplace(type, tuple(wid, watcher)); + watchers.emplace(type, make_shared<StorageWatchCallback>(wid, watcher)); return wid; } void MemoryStorage::unwatchHead(UUID type, int wid) { - scoped_lock lock(watcherLock); + shared_ptr<StorageWatchCallback> cb; - auto range = watchers.equal_range(type); - for (auto it = range.first; it != range.second; it++) { - if (std::get<0>(it->second) == wid) { - watchers.erase(it); - break; + { + scoped_lock lock(watcherLock); + + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) { + if (it->second->id == wid) { + cb = move(it->second); + watchers.erase(it); + break; + } } } + + // Run the last callback if scheduled and not yet executed + if (cb) + cb->run(); } optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const diff --git a/src/storage.h b/src/storage.h index 30e4213..c6b5ed2 100644 --- a/src/storage.h +++ b/src/storage.h @@ -45,6 +45,24 @@ public: virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0; }; +class StorageWatchCallback +{ +public: + StorageWatchCallback(int id, const function<void(UUID, const Digest &)> callback): + id(id), callback(callback) {} + + void schedule(UUID, const Digest &); + void run(); + + const int id; + +private: + const function<void(UUID, const Digest &)> callback; + + std::recursive_mutex runMutex; + optional<tuple<UUID, Digest>> scheduled; +}; + class FilesystemStorage : public StorageBackend { public: @@ -85,7 +103,7 @@ private: int inotify = -1; int inotifyWakeup = -1; int nextWatcherId = 1; - unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers; + unordered_multimap<UUID, shared_ptr<StorageWatchCallback>> watchers; unordered_map<int, UUID> watchMap; }; @@ -117,7 +135,7 @@ private: mutex watcherLock; int nextWatcherId = 1; - unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers; + unordered_multimap<UUID, shared_ptr<StorageWatchCallback>> watchers; }; class ChainStorage : public StorageBackend |