summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/storage.h43
-rw-r--r--src/network.cpp5
-rw-r--r--src/network.h2
-rw-r--r--src/storage.cpp98
-rw-r--r--src/storage.h22
5 files changed, 141 insertions, 29 deletions
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 07e7a4f..10ced57 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -11,6 +11,7 @@
#include <future>
#include <memory>
#include <optional>
+#include <stdexcept>
#include <string>
#include <variant>
#include <vector>
@@ -99,6 +100,7 @@ public:
protected:
template<typename T> friend class Head;
+ template<typename T> friend class WatchedHead;
Storage(const std::shared_ptr<const Priv> p): PartialStorage(p) {}
@@ -107,7 +109,8 @@ protected:
static UUID storeHead(UUID type, const Ref & ref);
static bool replaceHead(UUID type, UUID id, const Ref & old, const Ref & ref);
static std::optional<Ref> updateHead(UUID type, UUID id, const Ref & old, const std::function<Ref(const Ref &)> &);
- void watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const;
+ int watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const;
+ void unwatchHead(UUID type, UUID id, int watchId) const;
};
class Digest
@@ -472,6 +475,8 @@ void filterAncestors(std::vector<Stored<T>> & xs)
}
}
+template<class T> class WatchedHead;
+
template<class T>
class Head
{
@@ -489,13 +494,34 @@ public:
const Ref & ref() const { return mstored.ref(); }
std::optional<Head<T>> update(const std::function<Stored<T>(const Stored<T> &)> &) const;
- void watch(const std::function<void(const Head<T> &)> &) const;
+ WatchedHead<T> watch(const std::function<void(const Head<T> &)> &) const;
private:
UUID mid;
Stored<T> mstored;
};
+template<class T>
+class WatchedHead : public Head<T>
+{
+ friend class Head<T>;
+ WatchedHead(const Head<T> & h, int watcherId):
+ Head<T>(h), watcherId(watcherId) {}
+ WatchedHead(WatchedHead<T> && h):
+ Head<T>(h), watcherId(h.watcherId)
+ { h.watcherId = -1; }
+ int watcherId;
+
+public:
+ WatchedHead<T> & operator=(const Head<T> & h) {
+ if (Head<T>::id() != h.id())
+ throw std::runtime_error("WatchedHead ID mismatch");
+ static_cast<Head<T> &>(*this) = h;
+ return *this;
+ }
+ ~WatchedHead();
+};
+
template<typename T>
std::optional<Head<T>> Storage::head(UUID id) const
{
@@ -545,11 +571,20 @@ std::optional<Head<T>> Head<T>::update(const std::function<Stored<T>(const Store
}
template<typename T>
-void Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const
+WatchedHead<T> Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const
{
- stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) {
+ int wid = stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) {
watcher(Head<T>(id, ref));
});
+ return WatchedHead<T>(*this, wid);
+}
+
+template<class T>
+WatchedHead<T>::~WatchedHead()
+{
+ if (watcherId >= 0)
+ Head<T>::stored().ref().storage().unwatchHead(
+ T::headTypeId, Head<T>::id(), watcherId);
}
}
diff --git a/src/network.cpp b/src/network.cpp
index 74783b4..f33c097 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -201,8 +201,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,
vector<unique_ptr<Service>> && svcs):
- localHead(local),
self(self),
+ // Watching needs to start after self is initialized
+ localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))),
services(std::move(svcs))
{
struct ifaddrs * raddrs;
@@ -238,8 +239,6 @@ Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,
threadListen = thread([this] { doListen(); });
threadAnnounce = thread([this] { doAnnounce(); });
-
- local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1));
}
Server::Priv::~Priv()
diff --git a/src/network.h b/src/network.h
index e22d453..6ebd60c 100644
--- a/src/network.h
+++ b/src/network.h
@@ -157,8 +157,8 @@ struct Server::Priv : enable_shared_from_this<Server::Priv>
condition_variable announceCondvar;
bool finish = false;
- Head<LocalState> localHead;
Identity self;
+ WatchedHead<LocalState> localHead;
vector<unique_ptr<Service>> services;
thread threadListen;
diff --git a/src/storage.cpp b/src/storage.cpp
index 607d71a..f50b471 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -253,10 +253,10 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons
return true;
}
-void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+int FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
{
scoped_lock lock(watcherLock);
- watchers.emplace(type, watcher);
+ int wid = nextWatcherId++;
if (inotify < 0) {
inotify = inotify_init();
@@ -270,10 +270,43 @@ void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const
watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this);
}
- int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO);
- if (fd < 0)
- throw system_error(errno, std::generic_category());
- watchMap[fd] = type;
+ if (watchers.find(type) == watchers.end()) {
+ int wd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO);
+ if (wd < 0)
+ throw system_error(errno, std::generic_category());
+
+ watchMap[wd] = type;
+ }
+ watchers.emplace(type, tuple(wid, watcher));
+
+ return wid;
+}
+
+void FilesystemStorage::unwatchHead(UUID type, int wid)
+{
+ scoped_lock lock(watcherLock);
+
+ if (inotify < 0)
+ return;
+
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++) {
+ if (std::get<0>(it->second) == wid) {
+ watchers.erase(it);
+ break;
+ }
+ }
+
+ if (watchers.find(type) == watchers.end()) {
+ for (auto it = watchMap.begin(); it != watchMap.end(); it++) {
+ if (it->second == type) {
+ if (inotify_rm_watch(inotify, it->first) < 0)
+ throw system_error(errno, std::generic_category());
+ watchMap.erase(it);
+ break;
+ }
+ }
+ }
}
optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const
@@ -333,13 +366,13 @@ void FilesystemStorage::inotifyWatch()
event = (const struct inotify_event *) ptr;
if (event->mask & IN_MOVED_TO) {
+ scoped_lock lock(watcherLock);
UUID type = watchMap[event->wd];
if (auto mbid = UUID::fromString(event->name)) {
if (auto mbref = headRef(type, *mbid)) {
- scoped_lock lock(watcherLock);
auto range = watchers.equal_range(type);
for (auto it = range.first; it != range.second; it++)
- it->second(*mbid, *mbref);
+ std::get<1>(it->second)(*mbid, *mbref);
}
}
}
@@ -462,10 +495,25 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di
return false;
}
-void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+int MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
{
scoped_lock lock(watcherLock);
- watchers.emplace(type, watcher);
+ int wid = nextWatcherId++;
+ watchers.emplace(type, tuple(wid, watcher));
+ return wid;
+}
+
+void MemoryStorage::unwatchHead(UUID type, int wid)
+{
+ scoped_lock lock(watcherLock);
+
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++) {
+ if (std::get<0>(it->second) == wid) {
+ watchers.erase(it);
+ break;
+ }
+ }
}
optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const
@@ -537,10 +585,25 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig
return storage->replaceHead(type, id, old, dgst);
}
-void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+int ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
{
- parent->watchHead(type, watcher);
- storage->watchHead(type, watcher);
+ scoped_lock lock(watcherLock);
+ int wid = nextWatcherId++;
+
+ int id1 = parent->watchHead(type, watcher);
+ int id2 = storage->watchHead(type, watcher);
+ watchers.emplace(wid, tuple(id1, id2));
+
+ return wid;
+}
+
+void ChainStorage::unwatchHead(UUID type, int wid)
+{
+ scoped_lock lock(watcherLock);
+
+ auto [id1, id2] = watchers.extract(wid).mapped();
+ parent->unwatchHead(type, id1);
+ storage->unwatchHead(type, id2);
}
optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const
@@ -767,15 +830,20 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std
return nullopt;
}
-void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const
+int Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const
{
- p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) {
+ return p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) {
if (id == wid)
if (auto r = ref(dgst))
watcher(*r);
});
}
+void Storage::unwatchHead(UUID type, UUID, int wid) const
+{
+ p->backend->unwatchHead(type, wid);
+}
+
Digest::Digest(const string & str)
{
diff --git a/src/storage.h b/src/storage.h
index b8d769c..7ed8cf6 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -39,7 +39,8 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const = 0;
virtual UUID storeHead(UUID type, const Digest & dgst) = 0;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0;
- virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0;
+ virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0;
+ virtual void unwatchHead(UUID type, int watchId) = 0;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const = 0;
virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0;
@@ -60,7 +61,8 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
- virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual void unwatchHead(UUID type, int watchId) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;
@@ -83,7 +85,8 @@ private:
std::thread watcherThread;
int inotify = -1;
int inotifyWakeup = -1;
- unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers;
+ int nextWatcherId = 1;
+ unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;
unordered_map<int, UUID> watchMap;
};
@@ -102,7 +105,8 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
- virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual void unwatchHead(UUID type, int watchId) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;
@@ -113,7 +117,8 @@ private:
unordered_map<Digest, vector<uint8_t>> keys;
mutex watcherLock;
- unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers;
+ int nextWatcherId = 1;
+ unordered_multimap<UUID, tuple<int, function<void(UUID id, const Digest &)>>> watchers;
};
class ChainStorage : public StorageBackend
@@ -134,7 +139,8 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
- virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual int watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
+ virtual void unwatchHead(UUID type, int watchId) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;
@@ -142,6 +148,10 @@ public:
private:
shared_ptr<StorageBackend> storage;
unique_ptr<ChainStorage> parent;
+
+ mutex watcherLock;
+ int nextWatcherId = 1;
+ unordered_map<int, tuple<int, int>> watchers;
};
struct PartialStorage::Priv