From a511d2d1ef5fa07dde601961fe9394b474aad5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 24 Apr 2021 21:20:25 +0200 Subject: Behavior lens for shared refs from local state --- include/erebos/frp.h | 28 +++++++++++++++++++++++++++- include/erebos/state.h | 2 ++ include/erebos/storage.h | 4 ++-- include/erebos/sync.h | 8 +++++--- src/network.cpp | 2 ++ src/state.cpp | 12 ++++++++++++ src/storage.cpp | 10 ++++++++++ src/sync.cpp | 39 ++++++++++++--------------------------- 8 files changed, 72 insertions(+), 33 deletions(-) diff --git a/include/erebos/frp.h b/include/erebos/frp.h index 587d2b6..b60b29f 100644 --- a/include/erebos/frp.h +++ b/include/erebos/frp.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -115,6 +114,8 @@ public: return impl->get(ctime, x); } + template BhvFun lens() const; + const shared_ptr> impl; }; @@ -135,6 +136,8 @@ public: } Watched watch(function); + template BhvFun lens() const; + const shared_ptr> impl; }; @@ -227,4 +230,27 @@ BhvFun operator>>(const BhvFun & f, const BhvFun & g) return impl; } + +template +class BhvLens : public BhvImpl +{ +public: + B get(const BhvCurTime &, const A & x) const override + { return A::template lens(x); } +}; + +template +template +BhvFun BhvFun::lens() const +{ + return *this >> BhvFun(make_shared>()); +} + +template +template +BhvFun BhvFun::lens() const +{ + return *this >> BhvFun(make_shared>()); +} + } diff --git a/include/erebos/state.h b/include/erebos/state.h index b1567b0..7060f22 100644 --- a/include/erebos/state.h +++ b/include/erebos/state.h @@ -32,6 +32,8 @@ public: vector sharedRefs() const; LocalState sharedRefAdd(const Ref &) const; + template static T lens(const LocalState &); + private: vector lookupShared(UUID) const; LocalState updateShared(UUID, const vector &) const; diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 2d00cc3..9d28ec7 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -182,8 +182,8 @@ public: Ref & operator=(const Ref &) = default; Ref & operator=(Ref &&) = default; - bool operator==(const Ref &) = delete; - bool operator!=(const Ref &) = delete; + bool operator==(const Ref &) const; + bool operator!=(const Ref &) const; static std::optional create(const Storage &, const Digest &); static Ref zcreate(const Storage &); diff --git a/include/erebos/sync.h b/include/erebos/sync.h index 0b9ed9a..dad4e0e 100644 --- a/include/erebos/sync.h +++ b/include/erebos/sync.h @@ -6,9 +6,12 @@ #include #include +#include namespace erebos { +using std::vector; + class SyncService : public Service { public: @@ -22,11 +25,10 @@ public: private: void peerWatcher(size_t, const class Peer *); - void localStateWatcher(const Head &); + void localStateWatcher(const vector &); const class Server * server; - std::mutex headMutex; - std::optional> watchedHead; + std::optional>> watchedLocal; }; } diff --git a/src/network.cpp b/src/network.cpp index 259ae5e..8ee61b3 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -24,6 +24,8 @@ using namespace erebos; Server::Server(const Head & head, vector> && svcs): p(new Priv(head, *head->identity(), std::move(svcs))) { + for (const auto & s : p->services) + s->serverStarted(*this); } Server:: Server(const std::shared_ptr & ptr): diff --git a/src/state.cpp b/src/state.cpp index 6e39b5e..1a3381a 100644 --- a/src/state.cpp +++ b/src/state.cpp @@ -153,3 +153,15 @@ Ref SharedState::store(const Storage & st) const return st.storeObject(Record(std::move(items))); } + +template<> +optional LocalState::lens>(const LocalState & x) +{ + return x.identity(); +} + +template<> +vector LocalState::lens>(const LocalState & x) +{ + return x.sharedRefs(); +} diff --git a/src/storage.cpp b/src/storage.cpp index bd0beae..f68cb68 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -928,6 +928,16 @@ const PartialStorage & PartialRef::storage() const return *p->storage; } +bool Ref::operator==(const Ref & other) const +{ + return p->digest == other.p->digest; +} + +bool Ref::operator!=(const Ref & other) const +{ + return p->digest != other.p->digest; +} + optional Ref::create(const Storage & st, const Digest & digest) { if (!st.p->backend->contains(digest)) diff --git a/src/sync.cpp b/src/sync.cpp index 37f7ca9..cf423ac 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -38,40 +38,25 @@ void SyncService::serverStarted(const Server & s) server = &s; server->peerList().onUpdate(std::bind(&SyncService::peerWatcher, this, std::placeholders::_1, std::placeholders::_2)); - watchedHead = server->localHead().watch(std::bind(&SyncService::localStateWatcher, this, + watchedLocal = server->localState().lens>().watch(std::bind(&SyncService::localStateWatcher, this, std::placeholders::_1)); } void SyncService::peerWatcher(size_t, const Peer * peer) { - if (peer && peer->identity()->finalOwner().sameAs( - server->identity().finalOwner())) { - scoped_lock lock(headMutex); - for (const auto & r : (*watchedHead)->sharedRefs()) - peer->send(myUUID, r); + if (peer) { + if (auto id = peer->identity()) { + if (id->finalOwner().sameAs(server->identity().finalOwner())) + for (const auto & r : server->localState().get().sharedRefs()) + peer->send(myUUID, r); + } } } -void SyncService::localStateWatcher(const Head & head) +void SyncService::localStateWatcher(const vector & refs) { - scoped_lock lock(headMutex); - - bool same = head->sharedRefs().size() == - (*watchedHead)->sharedRefs().size(); - if (same) { - for (size_t i = 0; i < head->sharedRefs().size(); i++) - if (head->sharedRefs()[i].digest() != - (*watchedHead)->sharedRefs()[i].digest()) { - same = false; - break; - } - } - - if (!same) { - *watchedHead = head; - const auto & plist = server->peerList(); - for (size_t i = 0; i < plist.size(); i++) - for (const auto & r : (*watchedHead)->sharedRefs()) - plist.at(i).send(myUUID, r); - } + const auto & plist = server->peerList(); + for (size_t i = 0; i < plist.size(); i++) + for (const auto & r : refs) + plist.at(i).send(myUUID, r); } -- cgit v1.2.3