From 2abd6593c8b047d3fd579aa6cc0058bbebe266f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 17 Jan 2021 21:59:46 +0100 Subject: Server watching local state head --- src/storage.cpp | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 4 deletions(-) (limited to 'src/storage.cpp') diff --git a/src/storage.cpp b/src/storage.cpp index 45caadb..4f48c67 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -9,7 +9,10 @@ #include #include +#include #include +#include +#include #include #include @@ -28,8 +31,10 @@ using std::monostate; using std::nullopt; using std::ofstream; using std::runtime_error; +using std::scoped_lock; using std::shared_ptr; using std::string; +using std::system_error; using std::to_string; FilesystemStorage::FilesystemStorage(const fs::path & path): @@ -45,6 +50,24 @@ FilesystemStorage::FilesystemStorage(const fs::path & path): fs::create_directory(path/"heads"); } +FilesystemStorage::~FilesystemStorage() +{ + if (inotifyWakeup >= 0) { + uint64_t x = 1; + write(inotifyWakeup, &x, sizeof(x)); + } + + if (watcherThread.joinable()) + watcherThread.join(); + + if (inotify >= 0) + close(inotify); + + if (inotifyWakeup >= 0) + close(inotifyWakeup); + +} + bool FilesystemStorage::contains(const Digest & digest) const { return fs::exists(objectPath(digest)); @@ -237,6 +260,29 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons return true; } +void FilesystemStorage::watchHead(UUID type, const function & watcher) +{ + scoped_lock lock(watcherLock); + watchers.emplace(type, watcher); + + if (inotify < 0) { + inotify = inotify_init(); + if (inotify < 0) + throw system_error(errno, std::generic_category()); + + inotifyWakeup = eventfd(0, 0); + if (inotifyWakeup < 0) + throw system_error(errno, std::generic_category()); + + watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this); + } + + int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO); + if (fd < 0) + throw system_error(errno, std::generic_category()); + watchMap[fd] = type; +} + optional> FilesystemStorage::loadKey(const Digest & pubref) const { fs::path path = keyPath(pubref); @@ -259,6 +305,55 @@ void FilesystemStorage::storeKey(const Digest & pubref, const vector & file.write((const char *) key.data(), key.size()); } +void FilesystemStorage::inotifyWatch() +{ + char buf[4096] + __attribute__ ((aligned(__alignof__(struct inotify_event)))); + const struct inotify_event * event; + + array pfds { + pollfd { inotify, POLLIN, 0 }, + pollfd { inotifyWakeup, POLLIN, 0 }, + }; + + while (true) { + int ret = poll(pfds.data(), pfds.size(), -1); + if (ret < 0) + throw system_error(errno, std::generic_category()); + + if (!(pfds[0].revents & POLLIN)) + break; + + ssize_t len = read(inotify, buf, sizeof buf); + if (len < 0) { + if (errno == EAGAIN) + continue; + + throw system_error(errno, std::generic_category()); + } + + if (len == 0) + break; + + for (char * ptr = buf; ptr < buf + len; + ptr += sizeof(struct inotify_event) + event->len) { + event = (const struct inotify_event *) ptr; + + if (event->mask & IN_MOVED_TO) { + UUID type = watchMap[event->wd]; + if (auto mbid = UUID::fromString(event->name)) { + if (auto mbref = headRef(type, *mbid)) { + scoped_lock lock(watcherLock); + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) + it->second(*mbid, *mbref); + } + } + } + } + } +} + fs::path FilesystemStorage::objectPath(const Digest & digest) const { string name(digest); @@ -267,12 +362,16 @@ fs::path FilesystemStorage::objectPath(const Digest & digest) const fs::path(name.begin() + 2, name.end()); } +fs::path FilesystemStorage::headPath(UUID type) const +{ + string stype(type); + return root/"heads"/fs::path(stype.begin(), stype.end()); +} + fs::path FilesystemStorage::headPath(UUID type, UUID id) const { - string stype(type), sid(id); - return root/"heads"/ - fs::path(stype.begin(), stype.end())/ - fs::path(sid.begin(), sid.end()); + string sid(id); + return headPath(type) / fs::path(sid.begin(), sid.end()); } fs::path FilesystemStorage::keyPath(const Digest & digest) const @@ -367,6 +466,12 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di return false; } +void MemoryStorage::watchHead(UUID type, const function & watcher) +{ + scoped_lock lock(watcherLock); + watchers.emplace(type, watcher); +} + optional> MemoryStorage::loadKey(const Digest & digest) const { auto it = keys.find(digest); @@ -436,6 +541,12 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig return storage->replaceHead(type, id, old, dgst); } +void ChainStorage::watchHead(UUID type, const function & watcher) +{ + parent->watchHead(type, watcher); + storage->watchHead(type, watcher); +} + optional> ChainStorage::loadKey(const Digest & digest) const { if (auto res = storage->loadKey(digest)) @@ -669,6 +780,15 @@ optional Storage::updateHead(UUID type, UUID id, const Ref & old, const std return nullopt; } +void Storage::watchHead(UUID type, UUID wid, const std::function watcher) const +{ + p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { + if (id == wid) + if (auto r = ref(dgst)) + watcher(*r); + }); +} + Digest::Digest(const string & str) { -- cgit v1.2.3