summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/storage.h15
-rw-r--r--src/storage.cpp99
-rw-r--r--src/storage.h22
3 files changed, 102 insertions, 34 deletions
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index ed537eb..3a3073d 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -563,6 +563,9 @@ private:
Stored<T> mstored;
};
+/**
+ * Manages registered watch callbacks to Head<T> object using RAII principle.
+ */
template<class T>
class WatchedHead : public Head<T>
{
@@ -590,6 +593,18 @@ public:
static_cast<Head<T> &>(*this) = h;
return *this;
}
+
+ /// Destructor stops the watching started with Head<T>::watch call.
+ /**
+ * Once the WatchedHead object is destroyed, no further Head<T> changes
+ * will trigger the associated callback.
+ *
+ * The destructor also ensures that any scheduled callback run
+ * triggered by a previous change to the head is executed and finished
+ * before the destructor returns. The exception is when the destructor
+ * is called directly from the callback itself, in which case the
+ * destructor returns immediately.
+ */
~WatchedHead();
};
diff --git a/src/storage.cpp b/src/storage.cpp
index b5688bb..fb3698c 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -39,6 +39,23 @@ using std::system_error;
using std::to_string;
using std::weak_ptr;
+void StorageWatchCallback::schedule(UUID uuid, const Digest & dgst)
+{
+ scoped_lock lock(runMutex);
+ scheduled.emplace(uuid, dgst);
+}
+
+void StorageWatchCallback::run()
+{
+ scoped_lock lock(runMutex);
+ if (scheduled) {
+ auto [uuid, dgst] = *scheduled;
+ scheduled.reset(); // avoid running the callback twice
+
+ callback(uuid, dgst);
+ }
+}
+
FilesystemStorage::FilesystemStorage(const fs::path & path):
root(path)
{
@@ -278,36 +295,45 @@ int FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const D
watchMap[wd] = type;
}
- watchers.emplace(type, tuple(wid, watcher));
+ watchers.emplace(type, make_shared<StorageWatchCallback>(wid, watcher));
return wid;
}
void FilesystemStorage::unwatchHead(UUID type, int wid)
{
- scoped_lock lock(watcherLock);
+ shared_ptr<StorageWatchCallback> cb;
- if (inotify < 0)
- return;
+ {
+ scoped_lock lock(watcherLock);
- auto range = watchers.equal_range(type);
- for (auto it = range.first; it != range.second; it++) {
- if (std::get<0>(it->second) == wid) {
- watchers.erase(it);
- break;
- }
- }
+ if (inotify < 0)
+ return;
- if (watchers.find(type) == watchers.end()) {
- for (auto it = watchMap.begin(); it != watchMap.end(); it++) {
- if (it->second == type) {
- if (inotify_rm_watch(inotify, it->first) < 0)
- throw system_error(errno, std::generic_category());
- watchMap.erase(it);
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++) {
+ if (it->second->id == wid) {
+ cb = move(it->second);
+ watchers.erase(it);
break;
}
}
+
+ if (watchers.find(type) == watchers.end()) {
+ for (auto it = watchMap.begin(); it != watchMap.end(); it++) {
+ if (it->second == type) {
+ if (inotify_rm_watch(inotify, it->first) < 0)
+ throw system_error(errno, std::generic_category());
+ watchMap.erase(it);
+ break;
+ }
+ }
+ }
}
+
+ // Run the last callback if scheduled and not yet executed
+ if (cb)
+ cb->run();
}
optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const
@@ -367,9 +393,7 @@ void FilesystemStorage::inotifyWatch()
event = (const struct inotify_event *) ptr;
if (event->mask & IN_MOVED_TO) {
- vector<function<void(UUID id, const Digest &)>> callbacks;
- optional<UUID> mbid;
- optional<Digest> mbref;
+ vector<shared_ptr<StorageWatchCallback>> callbacks;
{
// Copy relevant callbacks to temporary array, so they
@@ -377,17 +401,19 @@ void FilesystemStorage::inotifyWatch()
scoped_lock lock(watcherLock);
UUID type = watchMap[event->wd];
- if ((mbid = UUID::fromString(event->name))) {
- if ((mbref = headRef(type, *mbid))) {
+ if (auto mbid = UUID::fromString(event->name)) {
+ if (auto mbref = headRef(type, *mbid)) {
auto range = watchers.equal_range(type);
- for (auto it = range.first; it != range.second; it++)
- callbacks.push_back(std::get<1>(it->second));
+ for (auto it = range.first; it != range.second; it++) {
+ it->second->schedule(*mbid, *mbref);
+ callbacks.push_back(it->second);
+ }
}
}
}
for (const auto & cb : callbacks)
- cb(*mbid, *mbref);
+ cb->run();
}
}
}
@@ -512,21 +538,30 @@ int MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Diges
{
scoped_lock lock(watcherLock);
int wid = nextWatcherId++;
- watchers.emplace(type, tuple(wid, watcher));
+ watchers.emplace(type, make_shared<StorageWatchCallback>(wid, watcher));
return wid;
}
void MemoryStorage::unwatchHead(UUID type, int wid)
{
- scoped_lock lock(watcherLock);
+ shared_ptr<StorageWatchCallback> cb;
- auto range = watchers.equal_range(type);
- for (auto it = range.first; it != range.second; it++) {
- if (std::get<0>(it->second) == wid) {
- watchers.erase(it);
- break;
+ {
+ scoped_lock lock(watcherLock);
+
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++) {
+ if (it->second->id == wid) {
+ cb = move(it->second);
+ watchers.erase(it);
+ break;
+ }
}
}
+
+ // Run the last callback if scheduled and not yet executed
+ if (cb)
+ cb->run();
}
optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const
diff --git a/src/storage.h b/src/storage.h
index 30e4213..c6b5ed2 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -45,6 +45,24 @@ public:
virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0;
};
+class StorageWatchCallback
+{
+public:
+ StorageWatchCallback(int id, const function<void(UUID, const Digest &)> callback):
+ id(id), callback(callback) {}
+
+ void schedule(UUID, const Digest &);
+ void run();
+
+ const int id;
+
+private:
+ const function<void(UUID, const Digest &)> callback;
+
+ std::recursive_mutex runMutex;
+ optional<tuple<UUID, Digest>> scheduled;
+};
+
class FilesystemStorage : public StorageBackend
{
public:
@@ -85,7 +103,7 @@ private:
int inotify = -1;
int inotifyWakeup = -1;
int nextWatcherId = 1;
- unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;
+ unordered_multimap<UUID, shared_ptr<StorageWatchCallback>> watchers;
unordered_map<int, UUID> watchMap;
};
@@ -117,7 +135,7 @@ private:
mutex watcherLock;
int nextWatcherId = 1;
- unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;
+ unordered_multimap<UUID, shared_ptr<StorageWatchCallback>> watchers;
};
class ChainStorage : public StorageBackend