summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/service.h2
-rw-r--r--include/erebos/state.h5
-rw-r--r--include/erebos/storage.h8
-rw-r--r--include/erebos/sync.h32
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/service.cpp4
-rw-r--r--src/state.cpp34
-rw-r--r--src/sync.cpp77
8 files changed, 157 insertions, 6 deletions
diff --git a/include/erebos/service.h b/include/erebos/service.h
index cec5ea0..ba06495 100644
--- a/include/erebos/service.h
+++ b/include/erebos/service.h
@@ -32,6 +32,8 @@ public:
virtual UUID uuid() const = 0;
virtual void handle(Context &) = 0;
+
+ virtual void serverStarted(const class Server &);
};
}
diff --git a/include/erebos/state.h b/include/erebos/state.h
index 543e03c..b1567b0 100644
--- a/include/erebos/state.h
+++ b/include/erebos/state.h
@@ -29,6 +29,9 @@ public:
template<class T> LocalState shared(const Storage & st, const T & x)
{ return updateShared(T::sharedTypeId, x.store(st)); }
+ vector<Ref> sharedRefs() const;
+ LocalState sharedRefAdd(const Ref &) const;
+
private:
vector<Ref> lookupShared(UUID) const;
LocalState updateShared(UUID, const vector<Ref> &) const;
@@ -47,7 +50,7 @@ template<class T>
LocalState LocalState::shared(const vector<Stored<T>> & v) const
{
vector<Ref> refs;
- for (const auto x : v)
+ for (const auto & x : v)
refs.push_back(x.ref());
return updateShared(T::sharedTypeId, refs);
}
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 75b8139..7ec73ab 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -507,12 +507,16 @@ class WatchedHead : public Head<T>
friend class Head<T>;
WatchedHead(const Head<T> & h, int watcherId):
Head<T>(h), watcherId(watcherId) {}
+ int watcherId;
+
+public:
WatchedHead(WatchedHead<T> && h):
Head<T>(h), watcherId(h.watcherId)
{ h.watcherId = -1; }
- int watcherId;
-public:
+ WatchedHead<T> & operator=(WatchedHead<T> && h)
+ { watcherId = h.watcherId; h.watcherId = -1; return *this; }
+
WatchedHead<T> & operator=(const Head<T> & h) {
if (Head<T>::id() != h.id())
throw std::runtime_error("WatchedHead ID mismatch");
diff --git a/include/erebos/sync.h b/include/erebos/sync.h
new file mode 100644
index 0000000..0b9ed9a
--- /dev/null
+++ b/include/erebos/sync.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <erebos/service.h>
+#include <erebos/state.h>
+#include <erebos/storage.h>
+
+#include <optional>
+#include <mutex>
+
+namespace erebos {
+
+class SyncService : public Service
+{
+public:
+ SyncService();
+ virtual ~SyncService();
+
+ UUID uuid() const override;
+ void handle(Context &) override;
+
+ void serverStarted(const class Server &) override;
+
+private:
+ void peerWatcher(size_t, const class Peer *);
+ void localStateWatcher(const Head<LocalState> &);
+
+ const class Server * server;
+ std::mutex headMutex;
+ std::optional<WatchedHead<LocalState>> watchedHead;
+};
+
+}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d8a076a..61f491f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -13,6 +13,7 @@ add_library(erebos
service
state
storage
+ sync
time
uuid
)
diff --git a/src/service.cpp b/src/service.cpp
index c731990..14ff665 100644
--- a/src/service.cpp
+++ b/src/service.cpp
@@ -33,3 +33,7 @@ void Service::Context::local(const LocalState & ls)
{
p->local = p->local.ref().storage().store(ls);
}
+
+void Service::serverStarted(const class Server &)
+{
+}
diff --git a/src/state.cpp b/src/state.cpp
index 8790dfc..6e39b5e 100644
--- a/src/state.cpp
+++ b/src/state.cpp
@@ -78,13 +78,41 @@ vector<Ref> LocalState::lookupShared(UUID type) const
return res;
}
+vector<Ref> LocalState::sharedRefs() const
+{
+ vector<Ref> refs;
+ for (const auto & x : p->shared)
+ refs.push_back(x.ref());
+ return refs;
+}
+
+LocalState LocalState::sharedRefAdd(const Ref & ref) const
+{
+ const Storage * st;
+ if (p->shared.size() > 0)
+ st = &p->shared[0].ref().storage();
+ else if (p->identity)
+ st = &p->identity->ref()->storage();
+ else
+ st = &ref.storage();
+
+ LocalState ret;
+ ret.p->identity = p->identity;
+ ret.p->shared = p->shared;
+ ret.p->shared.push_back(SharedState(ref).store(*st));
+ filterAncestors(ret.p->shared);
+ return ret;
+}
+
LocalState LocalState::updateShared(UUID type, const vector<Ref> & xs) const
{
const Storage * st;
- if (xs.size() > 0)
- st = &xs[0].storage();
- else if (p->shared.size() > 0)
+ if (p->shared.size() > 0)
st = &p->shared[0].ref().storage();
+ else if (p->identity)
+ st = &p->identity->ref()->storage();
+ else if (xs.size() > 0)
+ st = &xs[0].storage();
else
return *this;
diff --git a/src/sync.cpp b/src/sync.cpp
new file mode 100644
index 0000000..37f7ca9
--- /dev/null
+++ b/src/sync.cpp
@@ -0,0 +1,77 @@
+#include <erebos/sync.h>
+
+#include <erebos/network.h>
+
+using namespace erebos;
+
+using std::scoped_lock;
+
+static const UUID myUUID("a4f538d0-4e50-4082-8e10-7e3ec2af175d");
+
+SyncService::SyncService() = default;
+SyncService::~SyncService() = default;
+
+UUID SyncService::uuid() const
+{
+ return myUUID;
+}
+
+void SyncService::handle(Context & ctx)
+{
+ auto pid = ctx.peer().identity();
+ if (!pid)
+ return;
+
+ const auto & powner = pid->finalOwner();
+ const auto & owner = ctx.peer().server().identity().finalOwner();
+
+ if (!powner.sameAs(owner))
+ return;
+
+ ctx.local(
+ ctx.local()->sharedRefAdd(ctx.ref())
+ );
+}
+
+void SyncService::serverStarted(const Server & s)
+{
+ server = &s;
+ server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this,
+ std::placeholders::_1, std::placeholders::_2));
+ watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this,
+ std::placeholders::_1));
+}
+
+void SyncService::peerWatcher(size_t, const Peer * peer)
+{
+ if (peer && peer->identity()->finalOwner().sameAs(
+ server->identity().finalOwner())) {
+ scoped_lock lock(headMutex);
+ for (const auto & r : (*watchedHead)->sharedRefs())
+ peer->send(myUUID, r);
+ }
+}
+
+void SyncService::localStateWatcher(const Head<LocalState> & head)
+{
+ scoped_lock lock(headMutex);
+
+ bool same = head->sharedRefs().size() ==
+ (*watchedHead)->sharedRefs().size();
+ if (same) {
+ for (size_t i = 0; i < head->sharedRefs().size(); i++)
+ if (head->sharedRefs()[i].digest() !=
+ (*watchedHead)->sharedRefs()[i].digest()) {
+ same = false;
+ break;
+ }
+ }
+
+ if (!same) {
+ *watchedHead = head;
+ const auto & plist = server->peerList();
+ for (size_t i = 0; i < plist.size(); i++)
+ for (const auto & r : (*watchedHead)->sharedRefs())
+ plist.at(i).send(myUUID, r);
+ }
+}