summaryrefslogtreecommitdiff
path: root/src/storage.cpp
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2021-01-17 21:59:46 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2021-01-19 21:44:22 +0100
commit2abd6593c8b047d3fd579aa6cc0058bbebe266f8 (patch)
tree44af7148f648538846ef64bed0f8560f77de7e5d /src/storage.cpp
parent1466751256580d8b0e6eea46a8028dcab9742f6b (diff)
Server watching local state head
Diffstat (limited to 'src/storage.cpp')
-rw-r--r--src/storage.cpp128
1 files changed, 124 insertions, 4 deletions
diff --git a/src/storage.cpp b/src/storage.cpp
index 45caadb..4f48c67 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -9,7 +9,10 @@
#include <stdexcept>
#include <thread>
+#include <poll.h>
#include <stdio.h>
+#include <sys/eventfd.h>
+#include <sys/inotify.h>
#include <blake2.h>
#include <zlib.h>
@@ -28,8 +31,10 @@ using std::monostate;
using std::nullopt;
using std::ofstream;
using std::runtime_error;
+using std::scoped_lock;
using std::shared_ptr;
using std::string;
+using std::system_error;
using std::to_string;
FilesystemStorage::FilesystemStorage(const fs::path & path):
@@ -45,6 +50,24 @@ FilesystemStorage::FilesystemStorage(const fs::path & path):
fs::create_directory(path/"heads");
}
+FilesystemStorage::~FilesystemStorage()
+{
+ if (inotifyWakeup >= 0) {
+ uint64_t x = 1;
+ write(inotifyWakeup, &x, sizeof(x));
+ }
+
+ if (watcherThread.joinable())
+ watcherThread.join();
+
+ if (inotify >= 0)
+ close(inotify);
+
+ if (inotifyWakeup >= 0)
+ close(inotifyWakeup);
+
+}
+
bool FilesystemStorage::contains(const Digest & digest) const
{
return fs::exists(objectPath(digest));
@@ -237,6 +260,29 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons
return true;
}
+void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ scoped_lock lock(watcherLock);
+ watchers.emplace(type, watcher);
+
+ if (inotify < 0) {
+ inotify = inotify_init();
+ if (inotify < 0)
+ throw system_error(errno, std::generic_category());
+
+ inotifyWakeup = eventfd(0, 0);
+ if (inotifyWakeup < 0)
+ throw system_error(errno, std::generic_category());
+
+ watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this);
+ }
+
+ int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO);
+ if (fd < 0)
+ throw system_error(errno, std::generic_category());
+ watchMap[fd] = type;
+}
+
optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const
{
fs::path path = keyPath(pubref);
@@ -259,6 +305,55 @@ void FilesystemStorage::storeKey(const Digest & pubref, const vector<uint8_t> &
file.write((const char *) key.data(), key.size());
}
+void FilesystemStorage::inotifyWatch()
+{
+ char buf[4096]
+ __attribute__ ((aligned(__alignof__(struct inotify_event))));
+ const struct inotify_event * event;
+
+ array pfds {
+ pollfd { inotify, POLLIN, 0 },
+ pollfd { inotifyWakeup, POLLIN, 0 },
+ };
+
+ while (true) {
+ int ret = poll(pfds.data(), pfds.size(), -1);
+ if (ret < 0)
+ throw system_error(errno, std::generic_category());
+
+ if (!(pfds[0].revents & POLLIN))
+ break;
+
+ ssize_t len = read(inotify, buf, sizeof buf);
+ if (len < 0) {
+ if (errno == EAGAIN)
+ continue;
+
+ throw system_error(errno, std::generic_category());
+ }
+
+ if (len == 0)
+ break;
+
+ for (char * ptr = buf; ptr < buf + len;
+ ptr += sizeof(struct inotify_event) + event->len) {
+ event = (const struct inotify_event *) ptr;
+
+ if (event->mask & IN_MOVED_TO) {
+ UUID type = watchMap[event->wd];
+ if (auto mbid = UUID::fromString(event->name)) {
+ if (auto mbref = headRef(type, *mbid)) {
+ scoped_lock lock(watcherLock);
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++)
+ it->second(*mbid, *mbref);
+ }
+ }
+ }
+ }
+ }
+}
+
fs::path FilesystemStorage::objectPath(const Digest & digest) const
{
string name(digest);
@@ -267,12 +362,16 @@ fs::path FilesystemStorage::objectPath(const Digest & digest) const
fs::path(name.begin() + 2, name.end());
}
+fs::path FilesystemStorage::headPath(UUID type) const
+{
+ string stype(type);
+ return root/"heads"/fs::path(stype.begin(), stype.end());
+}
+
fs::path FilesystemStorage::headPath(UUID type, UUID id) const
{
- string stype(type), sid(id);
- return root/"heads"/
- fs::path(stype.begin(), stype.end())/
- fs::path(sid.begin(), sid.end());
+ string sid(id);
+ return headPath(type) / fs::path(sid.begin(), sid.end());
}
fs::path FilesystemStorage::keyPath(const Digest & digest) const
@@ -367,6 +466,12 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di
return false;
}
+void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ scoped_lock lock(watcherLock);
+ watchers.emplace(type, watcher);
+}
+
optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const
{
auto it = keys.find(digest);
@@ -436,6 +541,12 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig
return storage->replaceHead(type, id, old, dgst);
}
+void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ parent->watchHead(type, watcher);
+ storage->watchHead(type, watcher);
+}
+
optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const
{
if (auto res = storage->loadKey(digest))
@@ -669,6 +780,15 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std
return nullopt;
}
+void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const
+{
+ p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) {
+ if (id == wid)
+ if (auto r = ref(dgst))
+ watcher(*r);
+ });
+}
+
Digest::Digest(const string & str)
{