From c3d6046b25ef0786b8d2919dfa9db4eb05114501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 21 Feb 2021 22:16:21 +0100 Subject: Sync service --- src/CMakeLists.txt | 1 + src/service.cpp | 4 +++ src/state.cpp | 34 +++++++++++++++++++++--- src/sync.cpp | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 src/sync.cpp (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d8a076a..61f491f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,7 @@ add_library(erebos service state storage + sync time uuid ) diff --git a/src/service.cpp b/src/service.cpp index c731990..14ff665 100644 --- a/src/service.cpp +++ b/src/service.cpp @@ -33,3 +33,7 @@ void Service::Context::local(const LocalState & ls) { p->local = p->local.ref().storage().store(ls); } + +void Service::serverStarted(const class Server &) +{ +} diff --git a/src/state.cpp b/src/state.cpp index 8790dfc..6e39b5e 100644 --- a/src/state.cpp +++ b/src/state.cpp @@ -78,13 +78,41 @@ vector LocalState::lookupShared(UUID type) const return res; } +vector LocalState::sharedRefs() const +{ + vector refs; + for (const auto & x : p->shared) + refs.push_back(x.ref()); + return refs; +} + +LocalState LocalState::sharedRefAdd(const Ref & ref) const +{ + const Storage * st; + if (p->shared.size() > 0) + st = &p->shared[0].ref().storage(); + else if (p->identity) + st = &p->identity->ref()->storage(); + else + st = &ref.storage(); + + LocalState ret; + ret.p->identity = p->identity; + ret.p->shared = p->shared; + ret.p->shared.push_back(SharedState(ref).store(*st)); + filterAncestors(ret.p->shared); + return ret; +} + LocalState LocalState::updateShared(UUID type, const vector & xs) const { const Storage * st; - if (xs.size() > 0) - st = &xs[0].storage(); - else if (p->shared.size() > 0) + if (p->shared.size() > 0) st = &p->shared[0].ref().storage(); + else if (p->identity) + st = &p->identity->ref()->storage(); + else if (xs.size() > 0) + st = &xs[0].storage(); else return *this; diff --git a/src/sync.cpp b/src/sync.cpp new file mode 100644 index 0000000..37f7ca9 --- /dev/null +++ b/src/sync.cpp @@ -0,0 +1,77 @@ +#include + +#include + +using namespace erebos; + +using std::scoped_lock; + +static const UUID myUUID("a4f538d0-4e50-4082-8e10-7e3ec2af175d"); + +SyncService::SyncService() = default; +SyncService::~SyncService() = default; + +UUID SyncService::uuid() const +{ + return myUUID; +} + +void SyncService::handle(Context & ctx) +{ + auto pid = ctx.peer().identity(); + if (!pid) + return; + + const auto & powner = pid->finalOwner(); + const auto & owner = ctx.peer().server().identity().finalOwner(); + + if (!powner.sameAs(owner)) + return; + + ctx.local( + ctx.local()->sharedRefAdd(ctx.ref()) + ); +} + +void SyncService::serverStarted(const Server & s) +{ + server = &s; + server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this, + std::placeholders::_1, std::placeholders::_2)); + watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this, + std::placeholders::_1)); +} + +void SyncService::peerWatcher(size_t, const Peer * peer) +{ + if (peer && peer->identity()->finalOwner().sameAs( + server->identity().finalOwner())) { + scoped_lock lock(headMutex); + for (const auto & r : (*watchedHead)->sharedRefs()) + peer->send(myUUID, r); + } +} + +void SyncService::localStateWatcher(const Head & head) +{ + scoped_lock lock(headMutex); + + bool same = head->sharedRefs().size() == + (*watchedHead)->sharedRefs().size(); + if (same) { + for (size_t i = 0; i < head->sharedRefs().size(); i++) + if (head->sharedRefs()[i].digest() != + (*watchedHead)->sharedRefs()[i].digest()) { + same = false; + break; + } + } + + if (!same) { + *watchedHead = head; + const auto & plist = server->peerList(); + for (size_t i = 0; i < plist.size(); i++) + for (const auto & r : (*watchedHead)->sharedRefs()) + plist.at(i).send(myUUID, r); + } +} -- cgit v1.2.3