From c3d6046b25ef0786b8d2919dfa9db4eb05114501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 21 Feb 2021 22:16:21 +0100 Subject: Sync service --- include/erebos/service.h | 2 ++ include/erebos/state.h | 5 +++- include/erebos/storage.h | 8 +++-- include/erebos/sync.h | 32 ++++++++++++++++++++ src/CMakeLists.txt | 1 + src/service.cpp | 4 +++ src/state.cpp | 34 +++++++++++++++++++-- src/sync.cpp | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 157 insertions(+), 6 deletions(-) create mode 100644 include/erebos/sync.h create mode 100644 src/sync.cpp diff --git a/include/erebos/service.h b/include/erebos/service.h index cec5ea0..ba06495 100644 --- a/include/erebos/service.h +++ b/include/erebos/service.h @@ -32,6 +32,8 @@ public: virtual UUID uuid() const = 0; virtual void handle(Context &) = 0; + + virtual void serverStarted(const class Server &); }; } diff --git a/include/erebos/state.h b/include/erebos/state.h index 543e03c..b1567b0 100644 --- a/include/erebos/state.h +++ b/include/erebos/state.h @@ -29,6 +29,9 @@ public: template LocalState shared(const Storage & st, const T & x) { return updateShared(T::sharedTypeId, x.store(st)); } + vector sharedRefs() const; + LocalState sharedRefAdd(const Ref &) const; + private: vector lookupShared(UUID) const; LocalState updateShared(UUID, const vector &) const; @@ -47,7 +50,7 @@ template LocalState LocalState::shared(const vector> & v) const { vector refs; - for (const auto x : v) + for (const auto & x : v) refs.push_back(x.ref()); return updateShared(T::sharedTypeId, refs); } diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 75b8139..7ec73ab 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -507,12 +507,16 @@ class WatchedHead : public Head friend class Head; WatchedHead(const Head & h, int watcherId): Head(h), watcherId(watcherId) {} + int watcherId; + +public: WatchedHead(WatchedHead && h): Head(h), watcherId(h.watcherId) { h.watcherId = -1; } - int watcherId; -public: + WatchedHead & operator=(WatchedHead && h) + { watcherId = h.watcherId; h.watcherId = -1; return *this; } + WatchedHead & operator=(const Head & h) { if (Head::id() != h.id()) throw std::runtime_error("WatchedHead ID mismatch"); diff --git a/include/erebos/sync.h b/include/erebos/sync.h new file mode 100644 index 0000000..0b9ed9a --- /dev/null +++ b/include/erebos/sync.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include + +#include +#include + +namespace erebos { + +class SyncService : public Service +{ +public: + SyncService(); + virtual ~SyncService(); + + UUID uuid() const override; + void handle(Context &) override; + + void serverStarted(const class Server &) override; + +private: + void peerWatcher(size_t, const class Peer *); + void localStateWatcher(const Head &); + + const class Server * server; + std::mutex headMutex; + std::optional> watchedHead; +}; + +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d8a076a..61f491f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,7 @@ add_library(erebos service state storage + sync time uuid ) diff --git a/src/service.cpp b/src/service.cpp index c731990..14ff665 100644 --- a/src/service.cpp +++ b/src/service.cpp @@ -33,3 +33,7 @@ void Service::Context::local(const LocalState & ls) { p->local = p->local.ref().storage().store(ls); } + +void Service::serverStarted(const class Server &) +{ +} diff --git a/src/state.cpp b/src/state.cpp index 8790dfc..6e39b5e 100644 --- a/src/state.cpp +++ b/src/state.cpp @@ -78,13 +78,41 @@ vector LocalState::lookupShared(UUID type) const return res; } +vector LocalState::sharedRefs() const +{ + vector refs; + for (const auto & x : p->shared) + refs.push_back(x.ref()); + return refs; +} + +LocalState LocalState::sharedRefAdd(const Ref & ref) const +{ + const Storage * st; + if (p->shared.size() > 0) + st = &p->shared[0].ref().storage(); + else if (p->identity) + st = &p->identity->ref()->storage(); + else + st = &ref.storage(); + + LocalState ret; + ret.p->identity = p->identity; + ret.p->shared = p->shared; + ret.p->shared.push_back(SharedState(ref).store(*st)); + filterAncestors(ret.p->shared); + return ret; +} + LocalState LocalState::updateShared(UUID type, const vector & xs) const { const Storage * st; - if (xs.size() > 0) - st = &xs[0].storage(); - else if (p->shared.size() > 0) + if (p->shared.size() > 0) st = &p->shared[0].ref().storage(); + else if (p->identity) + st = &p->identity->ref()->storage(); + else if (xs.size() > 0) + st = &xs[0].storage(); else return *this; diff --git a/src/sync.cpp b/src/sync.cpp new file mode 100644 index 0000000..37f7ca9 --- /dev/null +++ b/src/sync.cpp @@ -0,0 +1,77 @@ +#include + +#include + +using namespace erebos; + +using std::scoped_lock; + +static const UUID myUUID("a4f538d0-4e50-4082-8e10-7e3ec2af175d"); + +SyncService::SyncService() = default; +SyncService::~SyncService() = default; + +UUID SyncService::uuid() const +{ + return myUUID; +} + +void SyncService::handle(Context & ctx) +{ + auto pid = ctx.peer().identity(); + if (!pid) + return; + + const auto & powner = pid->finalOwner(); + const auto & owner = ctx.peer().server().identity().finalOwner(); + + if (!powner.sameAs(owner)) + return; + + ctx.local( + ctx.local()->sharedRefAdd(ctx.ref()) + ); +} + +void SyncService::serverStarted(const Server & s) +{ + server = &s; + server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this, + std::placeholders::_1, std::placeholders::_2)); + watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this, + std::placeholders::_1)); +} + +void SyncService::peerWatcher(size_t, const Peer * peer) +{ + if (peer && peer->identity()->finalOwner().sameAs( + server->identity().finalOwner())) { + scoped_lock lock(headMutex); + for (const auto & r : (*watchedHead)->sharedRefs()) + peer->send(myUUID, r); + } +} + +void SyncService::localStateWatcher(const Head & head) +{ + scoped_lock lock(headMutex); + + bool same = head->sharedRefs().size() == + (*watchedHead)->sharedRefs().size(); + if (same) { + for (size_t i = 0; i < head->sharedRefs().size(); i++) + if (head->sharedRefs()[i].digest() != + (*watchedHead)->sharedRefs()[i].digest()) { + same = false; + break; + } + } + + if (!same) { + *watchedHead = head; + const auto & plist = server->peerList(); + for (size_t i = 0; i < plist.size(); i++) + for (const auto & r : (*watchedHead)->sharedRefs()) + plist.at(i).send(myUUID, r); + } +} -- cgit v1.2.3