summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/frp.h28
-rw-r--r--include/erebos/state.h2
-rw-r--r--include/erebos/storage.h4
-rw-r--r--include/erebos/sync.h8
-rw-r--r--src/network.cpp2
-rw-r--r--src/state.cpp12
-rw-r--r--src/storage.cpp10
-rw-r--r--src/sync.cpp39
8 files changed, 72 insertions, 33 deletions
diff --git a/include/erebos/frp.h b/include/erebos/frp.h
index 587d2b6..b60b29f 100644
--- a/include/erebos/frp.h
+++ b/include/erebos/frp.h
@@ -4,7 +4,6 @@
#include <memory>
#include <optional>
#include <functional>
-#include <type_traits>
#include <tuple>
#include <variant>
@@ -115,6 +114,8 @@ public:
return impl->get(ctime, x);
}
+ template<typename C> BhvFun<A, C> lens() const;
+
const shared_ptr<BhvImpl<A, B>> impl;
};
@@ -135,6 +136,8 @@ public:
}
Watched<A> watch(function<void(const A &)>);
+ template<typename C> BhvFun<monostate, C> lens() const;
+
const shared_ptr<BhvSource<A>> impl;
};
@@ -227,4 +230,27 @@ BhvFun<A, C> operator>>(const BhvFun<A, B> & f, const BhvFun<B, C> & g)
return impl;
}
+
+template<typename A, typename B>
+class BhvLens : public BhvImpl<A, B>
+{
+public:
+ B get(const BhvCurTime &, const A & x) const override
+ { return A::template lens<B>(x); }
+};
+
+template<typename A, typename B>
+template<typename C>
+BhvFun<A, C> BhvFun<A, B>::lens() const
+{
+ return *this >> BhvFun<B, C>(make_shared<BhvLens<B, C>>());
+}
+
+template<typename A>
+template<typename C>
+BhvFun<monostate, C> BhvFun<monostate, A>::lens() const
+{
+ return *this >> BhvFun<A, C>(make_shared<BhvLens<A, C>>());
+}
+
}
diff --git a/include/erebos/state.h b/include/erebos/state.h
index b1567b0..7060f22 100644
--- a/include/erebos/state.h
+++ b/include/erebos/state.h
@@ -32,6 +32,8 @@ public:
vector<Ref> sharedRefs() const;
LocalState sharedRefAdd(const Ref &) const;
+ template<typename T> static T lens(const LocalState &);
+
private:
vector<Ref> lookupShared(UUID) const;
LocalState updateShared(UUID, const vector<Ref> &) const;
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 2d00cc3..9d28ec7 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -182,8 +182,8 @@ public:
Ref & operator=(const Ref &) = default;
Ref & operator=(Ref &&) = default;
- bool operator==(const Ref &) = delete;
- bool operator!=(const Ref &) = delete;
+ bool operator==(const Ref &) const;
+ bool operator!=(const Ref &) const;
static std::optional<Ref> create(const Storage &, const Digest &);
static Ref zcreate(const Storage &);
diff --git a/include/erebos/sync.h b/include/erebos/sync.h
index 0b9ed9a..dad4e0e 100644
--- a/include/erebos/sync.h
+++ b/include/erebos/sync.h
@@ -6,9 +6,12 @@
#include <optional>
#include <mutex>
+#include <vector>
namespace erebos {
+using std::vector;
+
class SyncService : public Service
{
public:
@@ -22,11 +25,10 @@ public:
private:
void peerWatcher(size_t, const class Peer *);
- void localStateWatcher(const Head<LocalState> &);
+ void localStateWatcher(const vector<Ref> &);
const class Server * server;
- std::mutex headMutex;
- std::optional<WatchedHead<LocalState>> watchedHead;
+ std::optional<Watched<vector<Ref>>> watchedLocal;
};
}
diff --git a/src/network.cpp b/src/network.cpp
index 259ae5e..8ee61b3 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -24,6 +24,8 @@ using namespace erebos;
Server::Server(const Head<LocalState> & head, vector<unique_ptr<Service>> && svcs):
p(new Priv(head, *head->identity(), std::move(svcs)))
{
+ for (const auto & s : p->services)
+ s->serverStarted(*this);
}
Server:: Server(const std::shared_ptr<Priv> & ptr):
diff --git a/src/state.cpp b/src/state.cpp
index 6e39b5e..1a3381a 100644
--- a/src/state.cpp
+++ b/src/state.cpp
@@ -153,3 +153,15 @@ Ref SharedState::store(const Storage & st) const
return st.storeObject(Record(std::move(items)));
}
+
+template<>
+optional<Identity> LocalState::lens<optional<Identity>>(const LocalState & x)
+{
+ return x.identity();
+}
+
+template<>
+vector<Ref> LocalState::lens<vector<Ref>>(const LocalState & x)
+{
+ return x.sharedRefs();
+}
diff --git a/src/storage.cpp b/src/storage.cpp
index bd0beae..f68cb68 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -928,6 +928,16 @@ const PartialStorage & PartialRef::storage() const
return *p->storage;
}
+bool Ref::operator==(const Ref & other) const
+{
+ return p->digest == other.p->digest;
+}
+
+bool Ref::operator!=(const Ref & other) const
+{
+ return p->digest != other.p->digest;
+}
+
optional<Ref> Ref::create(const Storage & st, const Digest & digest)
{
if (!st.p->backend->contains(digest))
diff --git a/src/sync.cpp b/src/sync.cpp
index 37f7ca9..cf423ac 100644
--- a/src/sync.cpp
+++ b/src/sync.cpp
@@ -38,40 +38,25 @@ void SyncService::serverStarted(const Server & s)
server = &s;
server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this,
std::placeholders::_1, std::placeholders::_2));
- watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this,
+ watchedLocal = server->localState().lens<vector<Ref>>().watch(std::bind(&SyncService::localStateWatcher, this,
std::placeholders::_1));
}
void SyncService::peerWatcher(size_t, const Peer * peer)
{
- if (peer && peer->identity()->finalOwner().sameAs(
- server->identity().finalOwner())) {
- scoped_lock lock(headMutex);
- for (const auto & r : (*watchedHead)->sharedRefs())
- peer->send(myUUID, r);
+ if (peer) {
+ if (auto id = peer->identity()) {
+ if (id->finalOwner().sameAs(server->identity().finalOwner()))
+ for (const auto & r : server->localState().get().sharedRefs())
+ peer->send(myUUID, r);
+ }
}
}
-void SyncService::localStateWatcher(const Head<LocalState> & head)
+void SyncService::localStateWatcher(const vector<Ref> & refs)
{
- scoped_lock lock(headMutex);
-
- bool same = head->sharedRefs().size() ==
- (*watchedHead)->sharedRefs().size();
- if (same) {
- for (size_t i = 0; i < head->sharedRefs().size(); i++)
- if (head->sharedRefs()[i].digest() !=
- (*watchedHead)->sharedRefs()[i].digest()) {
- same = false;
- break;
- }
- }
-
- if (!same) {
- *watchedHead = head;
- const auto & plist = server->peerList();
- for (size_t i = 0; i < plist.size(); i++)
- for (const auto & r : (*watchedHead)->sharedRefs())
- plist.at(i).send(myUUID, r);
- }
+ const auto & plist = server->peerList();
+ for (size_t i = 0; i < plist.size(); i++)
+ for (const auto & r : refs)
+ plist.at(i).send(myUUID, r);
}