diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2021-01-17 21:59:46 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2021-01-19 21:44:22 +0100 |
commit | 2abd6593c8b047d3fd579aa6cc0058bbebe266f8 (patch) | |
tree | 44af7148f648538846ef64bed0f8560f77de7e5d | |
parent | 1466751256580d8b0e6eea46a8028dcab9742f6b (diff) |
Server watching local state head
-rw-r--r-- | include/erebos/network.h | 4 | ||||
-rw-r--r-- | include/erebos/storage.h | 10 | ||||
-rw-r--r-- | src/network.cpp | 27 | ||||
-rw-r--r-- | src/network.h | 6 | ||||
-rw-r--r-- | src/storage.cpp | 128 | ||||
-rw-r--r-- | src/storage.h | 24 |
6 files changed, 188 insertions, 11 deletions
diff --git a/include/erebos/network.h b/include/erebos/network.h index a2f989e..8f3debe 100644 --- a/include/erebos/network.h +++ b/include/erebos/network.h @@ -1,7 +1,7 @@ #pragma once -#include <erebos/identity.h> #include <erebos/service.h> +#include <erebos/state.h> #include <functional> #include <typeinfo> @@ -11,7 +11,7 @@ namespace erebos { class Server { public: - Server(const Identity &, std::vector<std::unique_ptr<Service>> &&); + Server(const Head<LocalState> &, std::vector<std::unique_ptr<Service>> &&); ~Server(); template<class S> S & svc(); diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 34ed9df..29eaa8f 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -107,6 +107,7 @@ protected: static UUID storeHead(UUID type, const Ref & ref); static bool replaceHead(UUID type, UUID id, const Ref & old, const Ref & ref); static std::optional<Ref> updateHead(UUID type, UUID id, const Ref & old, const std::function<Ref(const Ref &)> &); + void watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const; }; class Digest @@ -482,6 +483,7 @@ public: const Ref & ref() const { return mstored.ref(); } std::optional<Head<T>> update(const std::function<Stored<T>(const Stored<T> &)> &) const; + void watch(const std::function<void(const Head<T> &)> &) const; private: UUID mid; @@ -536,6 +538,14 @@ std::optional<Head<T>> Head<T>::update(const std::function<Stored<T>(const Store return Head<T>(mid, *res); } +template<typename T> +void Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const +{ + stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) { + watcher(Head<T>(id, ref)); + }); +} + } namespace std diff --git a/src/network.cpp b/src/network.cpp index 4b79dcb..bd64e07 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -21,8 +21,8 @@ using std::unique_lock; using namespace erebos; -Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs): - p(new Priv(self, std::move(svcs))) +Server::Server(const Head<LocalState> & head, vector<unique_ptr<Service>> && svcs): + p(new Priv(head, *head->identity(), std::move(svcs))) { } @@ -139,7 +139,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w) } -Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs): +Server::Priv::Priv(const Head<LocalState> & local, const Identity & self, + vector<unique_ptr<Service>> && svcs): + localHead(local), self(self), services(std::move(svcs)) { @@ -176,6 +178,8 @@ Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs): threadListen = thread([this] { doListen(); }); threadAnnounce = thread([this] { doAnnounce(); }); + + local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1)); } Server::Priv::~Priv() @@ -447,6 +451,23 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea } } +void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) +{ + scoped_lock lock(dataMutex); + if (auto id = head->identity()) { + if (id->ref()->digest() != self.ref()->digest()) { + self = *id; + + TransportHeader header({ + { TransportHeader::Type::AnnounceSelf, *self.ref() } + }); + + for (const auto & peer : peers) + peer->send(header, { **self.ref() }); + } + } +} + void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs) const { vector<uint8_t> data, part, out; diff --git a/src/network.h b/src/network.h index 9b146a9..c02dbc3 100644 --- a/src/network.h +++ b/src/network.h @@ -136,7 +136,8 @@ struct WaitingRef struct Server::Priv { - Priv(const Identity & self, vector<unique_ptr<Service>> && svcs); + Priv(const Head<LocalState> & local, const Identity & self, + vector<unique_ptr<Service>> && svcs); ~Priv(); void doListen(); void doAnnounce(); @@ -144,6 +145,8 @@ struct Server::Priv Peer & getPeer(const sockaddr_in & paddr); void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &); + void handleLocalHeadChange(const Head<LocalState> &); + constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; @@ -151,6 +154,7 @@ struct Server::Priv condition_variable announceCondvar; bool finish = false; + Head<LocalState> localHead; Identity self; vector<unique_ptr<Service>> services; diff --git a/src/storage.cpp b/src/storage.cpp index 45caadb..4f48c67 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -9,7 +9,10 @@ #include <stdexcept> #include <thread> +#include <poll.h> #include <stdio.h> +#include <sys/eventfd.h> +#include <sys/inotify.h> #include <blake2.h> #include <zlib.h> @@ -28,8 +31,10 @@ using std::monostate; using std::nullopt; using std::ofstream; using std::runtime_error; +using std::scoped_lock; using std::shared_ptr; using std::string; +using std::system_error; using std::to_string; FilesystemStorage::FilesystemStorage(const fs::path & path): @@ -45,6 +50,24 @@ FilesystemStorage::FilesystemStorage(const fs::path & path): fs::create_directory(path/"heads"); } +FilesystemStorage::~FilesystemStorage() +{ + if (inotifyWakeup >= 0) { + uint64_t x = 1; + write(inotifyWakeup, &x, sizeof(x)); + } + + if (watcherThread.joinable()) + watcherThread.join(); + + if (inotify >= 0) + close(inotify); + + if (inotifyWakeup >= 0) + close(inotifyWakeup); + +} + bool FilesystemStorage::contains(const Digest & digest) const { return fs::exists(objectPath(digest)); @@ -237,6 +260,29 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons return true; } +void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ + scoped_lock lock(watcherLock); + watchers.emplace(type, watcher); + + if (inotify < 0) { + inotify = inotify_init(); + if (inotify < 0) + throw system_error(errno, std::generic_category()); + + inotifyWakeup = eventfd(0, 0); + if (inotifyWakeup < 0) + throw system_error(errno, std::generic_category()); + + watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this); + } + + int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO); + if (fd < 0) + throw system_error(errno, std::generic_category()); + watchMap[fd] = type; +} + optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const { fs::path path = keyPath(pubref); @@ -259,6 +305,55 @@ void FilesystemStorage::storeKey(const Digest & pubref, const vector<uint8_t> & file.write((const char *) key.data(), key.size()); } +void FilesystemStorage::inotifyWatch() +{ + char buf[4096] + __attribute__ ((aligned(__alignof__(struct inotify_event)))); + const struct inotify_event * event; + + array pfds { + pollfd { inotify, POLLIN, 0 }, + pollfd { inotifyWakeup, POLLIN, 0 }, + }; + + while (true) { + int ret = poll(pfds.data(), pfds.size(), -1); + if (ret < 0) + throw system_error(errno, std::generic_category()); + + if (!(pfds[0].revents & POLLIN)) + break; + + ssize_t len = read(inotify, buf, sizeof buf); + if (len < 0) { + if (errno == EAGAIN) + continue; + + throw system_error(errno, std::generic_category()); + } + + if (len == 0) + break; + + for (char * ptr = buf; ptr < buf + len; + ptr += sizeof(struct inotify_event) + event->len) { + event = (const struct inotify_event *) ptr; + + if (event->mask & IN_MOVED_TO) { + UUID type = watchMap[event->wd]; + if (auto mbid = UUID::fromString(event->name)) { + if (auto mbref = headRef(type, *mbid)) { + scoped_lock lock(watcherLock); + auto range = watchers.equal_range(type); + for (auto it = range.first; it != range.second; it++) + it->second(*mbid, *mbref); + } + } + } + } + } +} + fs::path FilesystemStorage::objectPath(const Digest & digest) const { string name(digest); @@ -267,12 +362,16 @@ fs::path FilesystemStorage::objectPath(const Digest & digest) const fs::path(name.begin() + 2, name.end()); } +fs::path FilesystemStorage::headPath(UUID type) const +{ + string stype(type); + return root/"heads"/fs::path(stype.begin(), stype.end()); +} + fs::path FilesystemStorage::headPath(UUID type, UUID id) const { - string stype(type), sid(id); - return root/"heads"/ - fs::path(stype.begin(), stype.end())/ - fs::path(sid.begin(), sid.end()); + string sid(id); + return headPath(type) / fs::path(sid.begin(), sid.end()); } fs::path FilesystemStorage::keyPath(const Digest & digest) const @@ -367,6 +466,12 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di return false; } +void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ + scoped_lock lock(watcherLock); + watchers.emplace(type, watcher); +} + optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const { auto it = keys.find(digest); @@ -436,6 +541,12 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig return storage->replaceHead(type, id, old, dgst); } +void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher) +{ + parent->watchHead(type, watcher); + storage->watchHead(type, watcher); +} + optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const { if (auto res = storage->loadKey(digest)) @@ -669,6 +780,15 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std return nullopt; } +void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const +{ + p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) { + if (id == wid) + if (auto r = ref(dgst)) + watcher(*r); + }); +} + Digest::Digest(const string & str) { diff --git a/src/storage.h b/src/storage.h index 18ac1ad..b8d769c 100644 --- a/src/storage.h +++ b/src/storage.h @@ -2,16 +2,21 @@ #include "erebos/storage.h" +#include <functional> +#include <mutex> #include <unordered_map> #include <unordered_set> namespace fs = std::filesystem; +using std::function; +using std::mutex; using std::optional; using std::shared_future; using std::shared_ptr; using std::unique_ptr; using std::unordered_map; +using std::unordered_multimap; using std::unordered_set; using std::tuple; using std::variant; @@ -34,6 +39,7 @@ public: virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const = 0; virtual UUID storeHead(UUID type, const Digest & dgst) = 0; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0; + virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0; virtual optional<vector<uint8_t>> loadKey(const Digest &) const = 0; virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0; @@ -43,7 +49,7 @@ class FilesystemStorage : public StorageBackend { public: FilesystemStorage(const fs::path &); - virtual ~FilesystemStorage() = default; + virtual ~FilesystemStorage(); virtual bool contains(const Digest &) const override; @@ -54,20 +60,31 @@ public: virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; + virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; virtual optional<vector<uint8_t>> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector<uint8_t> &) override; private: + void inotifyWatch(); + static constexpr size_t CHUNK = 16384; fs::path objectPath(const Digest &) const; + fs::path headPath(UUID id) const; fs::path headPath(UUID id, UUID type) const; fs::path keyPath(const Digest &) const; FILE * openLockFile(const fs::path & path) const; fs::path root; + + mutex watcherLock; + std::thread watcherThread; + int inotify = -1; + int inotifyWakeup = -1; + unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers; + unordered_map<int, UUID> watchMap; }; class MemoryStorage : public StorageBackend @@ -85,6 +102,7 @@ public: virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; + virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; virtual optional<vector<uint8_t>> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector<uint8_t> &) override; @@ -93,6 +111,9 @@ private: unordered_map<Digest, vector<uint8_t>> storage; unordered_map<UUID, vector<tuple<UUID, Digest>>> heads; unordered_map<Digest, vector<uint8_t>> keys; + + mutex watcherLock; + unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers; }; class ChainStorage : public StorageBackend @@ -113,6 +134,7 @@ public: virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override; virtual UUID storeHead(UUID type, const Digest & dgst) override; virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override; + virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override; virtual optional<vector<uint8_t>> loadKey(const Digest &) const override; virtual void storeKey(const Digest &, const vector<uint8_t> &) override; |