summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2021-01-17 21:59:46 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2021-01-19 21:44:22 +0100
commit2abd6593c8b047d3fd579aa6cc0058bbebe266f8 (patch)
tree44af7148f648538846ef64bed0f8560f77de7e5d
parent1466751256580d8b0e6eea46a8028dcab9742f6b (diff)
Server watching local state head
-rw-r--r--include/erebos/network.h4
-rw-r--r--include/erebos/storage.h10
-rw-r--r--src/network.cpp27
-rw-r--r--src/network.h6
-rw-r--r--src/storage.cpp128
-rw-r--r--src/storage.h24
6 files changed, 188 insertions, 11 deletions
diff --git a/include/erebos/network.h b/include/erebos/network.h
index a2f989e..8f3debe 100644
--- a/include/erebos/network.h
+++ b/include/erebos/network.h
@@ -1,7 +1,7 @@
#pragma once
-#include <erebos/identity.h>
#include <erebos/service.h>
+#include <erebos/state.h>
#include <functional>
#include <typeinfo>
@@ -11,7 +11,7 @@ namespace erebos {
class Server
{
public:
- Server(const Identity &, std::vector<std::unique_ptr<Service>> &&);
+ Server(const Head<LocalState> &, std::vector<std::unique_ptr<Service>> &&);
~Server();
template<class S> S & svc();
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 34ed9df..29eaa8f 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -107,6 +107,7 @@ protected:
static UUID storeHead(UUID type, const Ref & ref);
static bool replaceHead(UUID type, UUID id, const Ref & old, const Ref & ref);
static std::optional<Ref> updateHead(UUID type, UUID id, const Ref & old, const std::function<Ref(const Ref &)> &);
+ void watchHead(UUID type, UUID id, const std::function<void(const Ref &)>) const;
};
class Digest
@@ -482,6 +483,7 @@ public:
const Ref & ref() const { return mstored.ref(); }
std::optional<Head<T>> update(const std::function<Stored<T>(const Stored<T> &)> &) const;
+ void watch(const std::function<void(const Head<T> &)> &) const;
private:
UUID mid;
@@ -536,6 +538,14 @@ std::optional<Head<T>> Head<T>::update(const std::function<Stored<T>(const Store
return Head<T>(mid, *res);
}
+template<typename T>
+void Head<T>::watch(const std::function<void(const Head<T> &)> & watcher) const
+{
+ stored().ref().storage().watchHead(T::headTypeId, id(), [id = id(), watcher] (const Ref & ref) {
+ watcher(Head<T>(id, ref));
+ });
+}
+
}
namespace std
diff --git a/src/network.cpp b/src/network.cpp
index 4b79dcb..bd64e07 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -21,8 +21,8 @@ using std::unique_lock;
using namespace erebos;
-Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs):
- p(new Priv(self, std::move(svcs)))
+Server::Server(const Head<LocalState> & head, vector<unique_ptr<Service>> && svcs):
+ p(new Priv(head, *head->identity(), std::move(svcs)))
{
}
@@ -139,7 +139,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
}
-Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs):
+Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,
+ vector<unique_ptr<Service>> && svcs):
+ localHead(local),
self(self),
services(std::move(svcs))
{
@@ -176,6 +178,8 @@ Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs):
threadListen = thread([this] { doListen(); });
threadAnnounce = thread([this] { doAnnounce(); });
+
+ local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1));
}
Server::Priv::~Priv()
@@ -447,6 +451,23 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
}
+void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
+{
+ scoped_lock lock(dataMutex);
+ if (auto id = head->identity()) {
+ if (id->ref()->digest() != self.ref()->digest()) {
+ self = *id;
+
+ TransportHeader header({
+ { TransportHeader::Type::AnnounceSelf, *self.ref() }
+ });
+
+ for (const auto & peer : peers)
+ peer->send(header, { **self.ref() });
+ }
+ }
+}
+
void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs) const
{
vector<uint8_t> data, part, out;
diff --git a/src/network.h b/src/network.h
index 9b146a9..c02dbc3 100644
--- a/src/network.h
+++ b/src/network.h
@@ -136,7 +136,8 @@ struct WaitingRef
struct Server::Priv
{
- Priv(const Identity & self, vector<unique_ptr<Service>> && svcs);
+ Priv(const Head<LocalState> & local, const Identity & self,
+ vector<unique_ptr<Service>> && svcs);
~Priv();
void doListen();
void doAnnounce();
@@ -144,6 +145,8 @@ struct Server::Priv
Peer & getPeer(const sockaddr_in & paddr);
void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &);
+ void handleLocalHeadChange(const Head<LocalState> &);
+
constexpr static uint16_t discoveryPort { 29665 };
constexpr static chrono::seconds announceInterval { 60 };
@@ -151,6 +154,7 @@ struct Server::Priv
condition_variable announceCondvar;
bool finish = false;
+ Head<LocalState> localHead;
Identity self;
vector<unique_ptr<Service>> services;
diff --git a/src/storage.cpp b/src/storage.cpp
index 45caadb..4f48c67 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -9,7 +9,10 @@
#include <stdexcept>
#include <thread>
+#include <poll.h>
#include <stdio.h>
+#include <sys/eventfd.h>
+#include <sys/inotify.h>
#include <blake2.h>
#include <zlib.h>
@@ -28,8 +31,10 @@ using std::monostate;
using std::nullopt;
using std::ofstream;
using std::runtime_error;
+using std::scoped_lock;
using std::shared_ptr;
using std::string;
+using std::system_error;
using std::to_string;
FilesystemStorage::FilesystemStorage(const fs::path & path):
@@ -45,6 +50,24 @@ FilesystemStorage::FilesystemStorage(const fs::path & path):
fs::create_directory(path/"heads");
}
+FilesystemStorage::~FilesystemStorage()
+{
+ if (inotifyWakeup >= 0) {
+ uint64_t x = 1;
+ write(inotifyWakeup, &x, sizeof(x));
+ }
+
+ if (watcherThread.joinable())
+ watcherThread.join();
+
+ if (inotify >= 0)
+ close(inotify);
+
+ if (inotifyWakeup >= 0)
+ close(inotifyWakeup);
+
+}
+
bool FilesystemStorage::contains(const Digest & digest) const
{
return fs::exists(objectPath(digest));
@@ -237,6 +260,29 @@ bool FilesystemStorage::replaceHead(UUID type, UUID id, const Digest & old, cons
return true;
}
+void FilesystemStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ scoped_lock lock(watcherLock);
+ watchers.emplace(type, watcher);
+
+ if (inotify < 0) {
+ inotify = inotify_init();
+ if (inotify < 0)
+ throw system_error(errno, std::generic_category());
+
+ inotifyWakeup = eventfd(0, 0);
+ if (inotifyWakeup < 0)
+ throw system_error(errno, std::generic_category());
+
+ watcherThread = std::thread(&FilesystemStorage::inotifyWatch, this);
+ }
+
+ int fd = inotify_add_watch(inotify, headPath(type).c_str(), IN_MOVED_TO);
+ if (fd < 0)
+ throw system_error(errno, std::generic_category());
+ watchMap[fd] = type;
+}
+
optional<vector<uint8_t>> FilesystemStorage::loadKey(const Digest & pubref) const
{
fs::path path = keyPath(pubref);
@@ -259,6 +305,55 @@ void FilesystemStorage::storeKey(const Digest & pubref, const vector<uint8_t> &
file.write((const char *) key.data(), key.size());
}
+void FilesystemStorage::inotifyWatch()
+{
+ char buf[4096]
+ __attribute__ ((aligned(__alignof__(struct inotify_event))));
+ const struct inotify_event * event;
+
+ array pfds {
+ pollfd { inotify, POLLIN, 0 },
+ pollfd { inotifyWakeup, POLLIN, 0 },
+ };
+
+ while (true) {
+ int ret = poll(pfds.data(), pfds.size(), -1);
+ if (ret < 0)
+ throw system_error(errno, std::generic_category());
+
+ if (!(pfds[0].revents & POLLIN))
+ break;
+
+ ssize_t len = read(inotify, buf, sizeof buf);
+ if (len < 0) {
+ if (errno == EAGAIN)
+ continue;
+
+ throw system_error(errno, std::generic_category());
+ }
+
+ if (len == 0)
+ break;
+
+ for (char * ptr = buf; ptr < buf + len;
+ ptr += sizeof(struct inotify_event) + event->len) {
+ event = (const struct inotify_event *) ptr;
+
+ if (event->mask & IN_MOVED_TO) {
+ UUID type = watchMap[event->wd];
+ if (auto mbid = UUID::fromString(event->name)) {
+ if (auto mbref = headRef(type, *mbid)) {
+ scoped_lock lock(watcherLock);
+ auto range = watchers.equal_range(type);
+ for (auto it = range.first; it != range.second; it++)
+ it->second(*mbid, *mbref);
+ }
+ }
+ }
+ }
+ }
+}
+
fs::path FilesystemStorage::objectPath(const Digest & digest) const
{
string name(digest);
@@ -267,12 +362,16 @@ fs::path FilesystemStorage::objectPath(const Digest & digest) const
fs::path(name.begin() + 2, name.end());
}
+fs::path FilesystemStorage::headPath(UUID type) const
+{
+ string stype(type);
+ return root/"heads"/fs::path(stype.begin(), stype.end());
+}
+
fs::path FilesystemStorage::headPath(UUID type, UUID id) const
{
- string stype(type), sid(id);
- return root/"heads"/
- fs::path(stype.begin(), stype.end())/
- fs::path(sid.begin(), sid.end());
+ string sid(id);
+ return headPath(type) / fs::path(sid.begin(), sid.end());
}
fs::path FilesystemStorage::keyPath(const Digest & digest) const
@@ -367,6 +466,12 @@ bool MemoryStorage::replaceHead(UUID type, UUID id, const Digest & old, const Di
return false;
}
+void MemoryStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ scoped_lock lock(watcherLock);
+ watchers.emplace(type, watcher);
+}
+
optional<vector<uint8_t>> MemoryStorage::loadKey(const Digest & digest) const
{
auto it = keys.find(digest);
@@ -436,6 +541,12 @@ bool ChainStorage::replaceHead(UUID type, UUID id, const Digest & old, const Dig
return storage->replaceHead(type, id, old, dgst);
}
+void ChainStorage::watchHead(UUID type, const function<void(UUID id, const Digest &)> & watcher)
+{
+ parent->watchHead(type, watcher);
+ storage->watchHead(type, watcher);
+}
+
optional<vector<uint8_t>> ChainStorage::loadKey(const Digest & digest) const
{
if (auto res = storage->loadKey(digest))
@@ -669,6 +780,15 @@ optional<Ref> Storage::updateHead(UUID type, UUID id, const Ref & old, const std
return nullopt;
}
+void Storage::watchHead(UUID type, UUID wid, const std::function<void(const Ref &)> watcher) const
+{
+ p->backend->watchHead(type, [this, wid, watcher] (UUID id, const Digest & dgst) {
+ if (id == wid)
+ if (auto r = ref(dgst))
+ watcher(*r);
+ });
+}
+
Digest::Digest(const string & str)
{
diff --git a/src/storage.h b/src/storage.h
index 18ac1ad..b8d769c 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -2,16 +2,21 @@
#include "erebos/storage.h"
+#include <functional>
+#include <mutex>
#include <unordered_map>
#include <unordered_set>
namespace fs = std::filesystem;
+using std::function;
+using std::mutex;
using std::optional;
using std::shared_future;
using std::shared_ptr;
using std::unique_ptr;
using std::unordered_map;
+using std::unordered_multimap;
using std::unordered_set;
using std::tuple;
using std::variant;
@@ -34,6 +39,7 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const = 0;
virtual UUID storeHead(UUID type, const Digest & dgst) = 0;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) = 0;
+ virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) = 0;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const = 0;
virtual void storeKey(const Digest &, const vector<uint8_t> &) = 0;
@@ -43,7 +49,7 @@ class FilesystemStorage : public StorageBackend
{
public:
FilesystemStorage(const fs::path &);
- virtual ~FilesystemStorage() = default;
+ virtual ~FilesystemStorage();
virtual bool contains(const Digest &) const override;
@@ -54,20 +60,31 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
+ virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;
private:
+ void inotifyWatch();
+
static constexpr size_t CHUNK = 16384;
fs::path objectPath(const Digest &) const;
+ fs::path headPath(UUID id) const;
fs::path headPath(UUID id, UUID type) const;
fs::path keyPath(const Digest &) const;
FILE * openLockFile(const fs::path & path) const;
fs::path root;
+
+ mutex watcherLock;
+ std::thread watcherThread;
+ int inotify = -1;
+ int inotifyWakeup = -1;
+ unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers;
+ unordered_map<int, UUID> watchMap;
};
class MemoryStorage : public StorageBackend
@@ -85,6 +102,7 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
+ virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;
@@ -93,6 +111,9 @@ private:
unordered_map<Digest, vector<uint8_t>> storage;
unordered_map<UUID, vector<tuple<UUID, Digest>>> heads;
unordered_map<Digest, vector<uint8_t>> keys;
+
+ mutex watcherLock;
+ unordered_multimap<UUID, function<void(UUID id, const Digest &)>> watchers;
};
class ChainStorage : public StorageBackend
@@ -113,6 +134,7 @@ public:
virtual vector<tuple<UUID, Digest>> headRefs(UUID type) const override;
virtual UUID storeHead(UUID type, const Digest & dgst) override;
virtual bool replaceHead(UUID type, UUID id, const Digest & old, const Digest & dgst) override;
+ virtual void watchHead(UUID type, const function<void(UUID id, const Digest &)> &) override;
virtual optional<vector<uint8_t>> loadKey(const Digest &) const override;
virtual void storeKey(const Digest &, const vector<uint8_t> &) override;