diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2021-04-24 21:20:25 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2021-04-24 21:20:25 +0200 |
commit | a511d2d1ef5fa07dde601961fe9394b474aad5ae (patch) | |
tree | 9dd7321cba6303583d044cae95ea622e96b55d45 | |
parent | 9aaba1211c95dc7e08437a7cca73452181e296d6 (diff) |
Behavior lens for shared refs from local state
-rw-r--r-- | include/erebos/frp.h | 28 | ||||
-rw-r--r-- | include/erebos/state.h | 2 | ||||
-rw-r--r-- | include/erebos/storage.h | 4 | ||||
-rw-r--r-- | include/erebos/sync.h | 8 | ||||
-rw-r--r-- | src/network.cpp | 2 | ||||
-rw-r--r-- | src/state.cpp | 12 | ||||
-rw-r--r-- | src/storage.cpp | 10 | ||||
-rw-r--r-- | src/sync.cpp | 39 |
8 files changed, 72 insertions, 33 deletions
diff --git a/include/erebos/frp.h b/include/erebos/frp.h index 587d2b6..b60b29f 100644 --- a/include/erebos/frp.h +++ b/include/erebos/frp.h @@ -4,7 +4,6 @@ #include <memory> #include <optional> #include <functional> -#include <type_traits> #include <tuple> #include <variant> @@ -115,6 +114,8 @@ public: return impl->get(ctime, x); } + template<typename C> BhvFun<A, C> lens() const; + const shared_ptr<BhvImpl<A, B>> impl; }; @@ -135,6 +136,8 @@ public: } Watched<A> watch(function<void(const A &)>); + template<typename C> BhvFun<monostate, C> lens() const; + const shared_ptr<BhvSource<A>> impl; }; @@ -227,4 +230,27 @@ BhvFun<A, C> operator>>(const BhvFun<A, B> & f, const BhvFun<B, C> & g) return impl; } + +template<typename A, typename B> +class BhvLens : public BhvImpl<A, B> +{ +public: + B get(const BhvCurTime &, const A & x) const override + { return A::template lens<B>(x); } +}; + +template<typename A, typename B> +template<typename C> +BhvFun<A, C> BhvFun<A, B>::lens() const +{ + return *this >> BhvFun<B, C>(make_shared<BhvLens<B, C>>()); +} + +template<typename A> +template<typename C> +BhvFun<monostate, C> BhvFun<monostate, A>::lens() const +{ + return *this >> BhvFun<A, C>(make_shared<BhvLens<A, C>>()); +} + } diff --git a/include/erebos/state.h b/include/erebos/state.h index b1567b0..7060f22 100644 --- a/include/erebos/state.h +++ b/include/erebos/state.h @@ -32,6 +32,8 @@ public: vector<Ref> sharedRefs() const; LocalState sharedRefAdd(const Ref &) const; + template<typename T> static T lens(const LocalState &); + private: vector<Ref> lookupShared(UUID) const; LocalState updateShared(UUID, const vector<Ref> &) const; diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 2d00cc3..9d28ec7 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -182,8 +182,8 @@ public: Ref & operator=(const Ref &) = default; Ref & operator=(Ref &&) = default; - bool operator==(const Ref &) = delete; - bool operator!=(const Ref &) = delete; + bool operator==(const Ref &) const; + bool operator!=(const Ref &) const; static std::optional<Ref> create(const Storage &, const Digest &); static Ref zcreate(const Storage &); diff --git a/include/erebos/sync.h b/include/erebos/sync.h index 0b9ed9a..dad4e0e 100644 --- a/include/erebos/sync.h +++ b/include/erebos/sync.h @@ -6,9 +6,12 @@ #include <optional> #include <mutex> +#include <vector> namespace erebos { +using std::vector; + class SyncService : public Service { public: @@ -22,11 +25,10 @@ public: private: void peerWatcher(size_t, const class Peer *); - void localStateWatcher(const Head<LocalState> &); + void localStateWatcher(const vector<Ref> &); const class Server * server; - std::mutex headMutex; - std::optional<WatchedHead<LocalState>> watchedHead; + std::optional<Watched<vector<Ref>>> watchedLocal; }; } diff --git a/src/network.cpp b/src/network.cpp index 259ae5e..8ee61b3 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -24,6 +24,8 @@ using namespace erebos; Server::Server(const Head<LocalState> & head, vector<unique_ptr<Service>> && svcs): p(new Priv(head, *head->identity(), std::move(svcs))) { + for (const auto & s : p->services) + s->serverStarted(*this); } Server:: Server(const std::shared_ptr<Priv> & ptr): diff --git a/src/state.cpp b/src/state.cpp index 6e39b5e..1a3381a 100644 --- a/src/state.cpp +++ b/src/state.cpp @@ -153,3 +153,15 @@ Ref SharedState::store(const Storage & st) const return st.storeObject(Record(std::move(items))); } + +template<> +optional<Identity> LocalState::lens<optional<Identity>>(const LocalState & x) +{ + return x.identity(); +} + +template<> +vector<Ref> LocalState::lens<vector<Ref>>(const LocalState & x) +{ + return x.sharedRefs(); +} diff --git a/src/storage.cpp b/src/storage.cpp index bd0beae..f68cb68 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -928,6 +928,16 @@ const PartialStorage & PartialRef::storage() const return *p->storage; } +bool Ref::operator==(const Ref & other) const +{ + return p->digest == other.p->digest; +} + +bool Ref::operator!=(const Ref & other) const +{ + return p->digest != other.p->digest; +} + optional<Ref> Ref::create(const Storage & st, const Digest & digest) { if (!st.p->backend->contains(digest)) diff --git a/src/sync.cpp b/src/sync.cpp index 37f7ca9..cf423ac 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -38,40 +38,25 @@ void SyncService::serverStarted(const Server & s) server = &s; server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this, std::placeholders::_1, std::placeholders::_2)); - watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this, + watchedLocal = server->localState().lens<vector<Ref>>().watch(std::bind(&SyncService::localStateWatcher, this, std::placeholders::_1)); } void SyncService::peerWatcher(size_t, const Peer * peer) { - if (peer && peer->identity()->finalOwner().sameAs( - server->identity().finalOwner())) { - scoped_lock lock(headMutex); - for (const auto & r : (*watchedHead)->sharedRefs()) - peer->send(myUUID, r); + if (peer) { + if (auto id = peer->identity()) { + if (id->finalOwner().sameAs(server->identity().finalOwner())) + for (const auto & r : server->localState().get().sharedRefs()) + peer->send(myUUID, r); + } } } -void SyncService::localStateWatcher(const Head<LocalState> & head) +void SyncService::localStateWatcher(const vector<Ref> & refs) { - scoped_lock lock(headMutex); - - bool same = head->sharedRefs().size() == - (*watchedHead)->sharedRefs().size(); - if (same) { - for (size_t i = 0; i < head->sharedRefs().size(); i++) - if (head->sharedRefs()[i].digest() != - (*watchedHead)->sharedRefs()[i].digest()) { - same = false; - break; - } - } - - if (!same) { - *watchedHead = head; - const auto & plist = server->peerList(); - for (size_t i = 0; i < plist.size(); i++) - for (const auto & r : (*watchedHead)->sharedRefs()) - plist.at(i).send(myUUID, r); - } + const auto & plist = server->peerList(); + for (size_t i = 0; i < plist.size(); i++) + for (const auto & r : refs) + plist.at(i).send(myUUID, r); } |