summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/service.cpp4
-rw-r--r--src/state.cpp34
-rw-r--r--src/sync.cpp77
4 files changed, 113 insertions, 3 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d8a076a..61f491f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -13,6 +13,7 @@ add_library(erebos
service
state
storage
+ sync
time
uuid
)
diff --git a/src/service.cpp b/src/service.cpp
index c731990..14ff665 100644
--- a/src/service.cpp
+++ b/src/service.cpp
@@ -33,3 +33,7 @@ void Service::Context::local(const LocalState & ls)
{
p->local = p->local.ref().storage().store(ls);
}
+
+void Service::serverStarted(const class Server &)
+{
+}
diff --git a/src/state.cpp b/src/state.cpp
index 8790dfc..6e39b5e 100644
--- a/src/state.cpp
+++ b/src/state.cpp
@@ -78,13 +78,41 @@ vector<Ref> LocalState::lookupShared(UUID type) const
return res;
}
+vector<Ref> LocalState::sharedRefs() const
+{
+ vector<Ref> refs;
+ for (const auto & x : p->shared)
+ refs.push_back(x.ref());
+ return refs;
+}
+
+LocalState LocalState::sharedRefAdd(const Ref & ref) const
+{
+ const Storage * st;
+ if (p->shared.size() > 0)
+ st = &p->shared[0].ref().storage();
+ else if (p->identity)
+ st = &p->identity->ref()->storage();
+ else
+ st = &ref.storage();
+
+ LocalState ret;
+ ret.p->identity = p->identity;
+ ret.p->shared = p->shared;
+ ret.p->shared.push_back(SharedState(ref).store(*st));
+ filterAncestors(ret.p->shared);
+ return ret;
+}
+
LocalState LocalState::updateShared(UUID type, const vector<Ref> & xs) const
{
const Storage * st;
- if (xs.size() > 0)
- st = &xs[0].storage();
- else if (p->shared.size() > 0)
+ if (p->shared.size() > 0)
st = &p->shared[0].ref().storage();
+ else if (p->identity)
+ st = &p->identity->ref()->storage();
+ else if (xs.size() > 0)
+ st = &xs[0].storage();
else
return *this;
diff --git a/src/sync.cpp b/src/sync.cpp
new file mode 100644
index 0000000..37f7ca9
--- /dev/null
+++ b/src/sync.cpp
@@ -0,0 +1,77 @@
+#include <erebos/sync.h>
+
+#include <erebos/network.h>
+
+using namespace erebos;
+
+using std::scoped_lock;
+
+static const UUID myUUID("a4f538d0-4e50-4082-8e10-7e3ec2af175d");
+
+SyncService::SyncService() = default;
+SyncService::~SyncService() = default;
+
+UUID SyncService::uuid() const
+{
+ return myUUID;
+}
+
+void SyncService::handle(Context & ctx)
+{
+ auto pid = ctx.peer().identity();
+ if (!pid)
+ return;
+
+ const auto & powner = pid->finalOwner();
+ const auto & owner = ctx.peer().server().identity().finalOwner();
+
+ if (!powner.sameAs(owner))
+ return;
+
+ ctx.local(
+ ctx.local()->sharedRefAdd(ctx.ref())
+ );
+}
+
+void SyncService::serverStarted(const Server & s)
+{
+ server = &s;
+ server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this,
+ std::placeholders::_1, std::placeholders::_2));
+ watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this,
+ std::placeholders::_1));
+}
+
+void SyncService::peerWatcher(size_t, const Peer * peer)
+{
+ if (peer && peer->identity()->finalOwner().sameAs(
+ server->identity().finalOwner())) {
+ scoped_lock lock(headMutex);
+ for (const auto & r : (*watchedHead)->sharedRefs())
+ peer->send(myUUID, r);
+ }
+}
+
+void SyncService::localStateWatcher(const Head<LocalState> & head)
+{
+ scoped_lock lock(headMutex);
+
+ bool same = head->sharedRefs().size() ==
+ (*watchedHead)->sharedRefs().size();
+ if (same) {
+ for (size_t i = 0; i < head->sharedRefs().size(); i++)
+ if (head->sharedRefs()[i].digest() !=
+ (*watchedHead)->sharedRefs()[i].digest()) {
+ same = false;
+ break;
+ }
+ }
+
+ if (!same) {
+ *watchedHead = head;
+ const auto & plist = server->peerList();
+ for (size_t i = 0; i < plist.size(); i++)
+ for (const auto & r : (*watchedHead)->sharedRefs())
+ plist.at(i).send(myUUID, r);
+ }
+}