From 9aaba1211c95dc7e08437a7cca73452181e296d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 18 Apr 2021 22:25:17 +0200 Subject: Initial support for FRP behaviors --- include/erebos/frp.h | 230 ++++++++++++++++++++++++++++++++++++++++++++++ include/erebos/identity.h | 2 + include/erebos/network.h | 1 + include/erebos/storage.h | 26 ++++++ src/CMakeLists.txt | 1 + src/frp.cpp | 133 +++++++++++++++++++++++++++ src/identity.cpp | 10 ++ src/network.cpp | 6 ++ src/network.h | 1 + 9 files changed, 410 insertions(+) create mode 100644 include/erebos/frp.h create mode 100644 src/frp.cpp diff --git a/include/erebos/frp.h b/include/erebos/frp.h new file mode 100644 index 0000000..587d2b6 --- /dev/null +++ b/include/erebos/frp.h @@ -0,0 +1,230 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace erebos { + +using std::enable_if_t; +using std::function; +using std::is_same_v; +using std::make_shared; +using std::monostate; +using std::optional; +using std::shared_ptr; +using std::static_pointer_cast; +using std::tuple; +using std::vector; +using std::weak_ptr; + +class BhvTime +{ + BhvTime(uint64_t t): t(t) {} + friend class BhvCurTime; +public: + +private: + uint64_t t; +}; + +class BhvCurTime +{ +public: + BhvCurTime(); + ~BhvCurTime(); + BhvCurTime(const BhvCurTime &) = delete; + BhvCurTime(BhvCurTime &&); + + BhvCurTime & operator=(const BhvCurTime &) = delete; + BhvCurTime & operator=(BhvCurTime &&); + +private: + optional t; +}; + +template +class Watched +{ +public: + Watched(shared_ptr> && cb): + cb(move(cb)) {} + ~Watched(); + +private: + shared_ptr> cb; +}; + +template +Watched::~Watched() +{ + BhvCurTime ctime; + cb.reset(); +} + +class BhvImplBase : public std::enable_shared_from_this +{ +public: + virtual ~BhvImplBase(); + +protected: + void dependsOn(shared_ptr other); + void updated(const BhvCurTime &); + virtual bool needsUpdate(const BhvCurTime &) const; + virtual void doUpdate(const BhvCurTime &); + + vector>> watchers; +private: + void markDirty(const BhvCurTime &, vector> &); + void updateDirty(const BhvCurTime &); + + bool dirty = false; + vector> depends; + vector> rdepends; + + template friend class BhvFun; +}; + +template +class BhvImpl : public BhvImplBase +{ +public: + virtual B get(const BhvCurTime &, const A &) const = 0; +}; + +template +using BhvSource = BhvImpl; + +template +class BhvFun +{ +public: + BhvFun(shared_ptr> impl): + impl(move(impl)) {} + + template BhvFun(shared_ptr impl): + BhvFun(static_pointer_cast>(impl)) {} + + B get(const A & x) const + { + BhvCurTime ctime; + return impl->get(ctime, x); + } + + const shared_ptr> impl; +}; + +template +class BhvFun +{ +public: + BhvFun(shared_ptr> impl): + impl(move(impl)) {} + + template BhvFun(shared_ptr impl): + BhvFun(static_pointer_cast>(impl)) {} + + A get() const + { + BhvCurTime ctime; + return impl->get(ctime, monostate()); + } + Watched watch(function); + + const shared_ptr> impl; +}; + +template +using Bhv = BhvFun; + +template +Watched Bhv::watch(function f) +{ + auto cb = make_shared>( + [impl = BhvFun::impl, f] (const BhvCurTime & ctime) { + f(impl->get(ctime, monostate())); + }); + BhvFun::impl->watchers.push_back(cb); + return Watched(move(cb)); +} + + +template +class BhvLambda : public BhvImpl +{ +public: + BhvLambda(function f): f(f) {} + + B get(const BhvCurTime &, const A & x) const override + { return f(x); } + +private: + function f; +}; + +template +BhvFun bfun(function f) +{ + return make_shared>(f); +} + + +template class BhvComp; +template +BhvFun operator>>(const BhvFun & f, const BhvFun & g); + +template +class BhvComp : public BhvImpl +{ +public: + BhvComp(const BhvFun & f, const BhvFun): + f(f), g(g) {} + + C get(const BhvCurTime & ctime, const A & x) const override + { return g.impl.get(ctime, f.impl.get(ctime, x)); } + +private: + BhvFun f; + BhvFun g; + + friend BhvFun operator>> (const BhvFun &, const BhvFun &); +}; + +template +class BhvComp : public BhvSource +{ +public: + BhvComp(const BhvFun & f, const BhvFun & g): + f(f), g(g) {} + + bool needsUpdate(const BhvCurTime & ctime) const override + { return !x || g.impl->get(ctime, f.impl->get(ctime, monostate())) != x.value(); } + + void doUpdate(const BhvCurTime & ctime) override + { x = g.impl->get(ctime, f.impl->get(ctime, monostate())); } + + C get(const BhvCurTime & ctime, const monostate & m) const override + { return x ? x.value() : g.impl->get(ctime, f.impl->get(ctime, m)); } + +private: + BhvFun f; + BhvFun g; + optional x; + + friend BhvFun operator>> (const BhvFun &, const BhvFun &); +}; + +template +BhvFun operator>>(const BhvFun & f, const BhvFun & g) +{ + auto impl = make_shared>(f, g); + impl->dependsOn(f.impl); + impl->dependsOn(g.impl); + return impl; +} + +} diff --git a/include/erebos/identity.h b/include/erebos/identity.h index aeddd08..f7b17b8 100644 --- a/include/erebos/identity.h +++ b/include/erebos/identity.h @@ -24,6 +24,8 @@ public: Stored keyMessage() const; bool sameAs(const Identity &) const; + bool operator==(const Identity & other) const; + bool operator!=(const Identity & other) const; std::optional ref() const; diff --git a/include/erebos/network.h b/include/erebos/network.h index 5a1fca6..f2ae191 100644 --- a/include/erebos/network.h +++ b/include/erebos/network.h @@ -17,6 +17,7 @@ public: ~Server(); const Head & localHead() const; + const Bhv & localState() const; const Identity & identity() const; template S & svc(); diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 7ec73ab..2d00cc3 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -496,6 +497,8 @@ public: std::optional> update(const std::function(const Stored &)> &) const; WatchedHead watch(const std::function &)> &) const; + Bhv behavior() const; + private: UUID mid; Stored mstored; @@ -526,6 +529,23 @@ public: ~WatchedHead(); }; +template +class HeadBhv : public BhvSource +{ +public: + HeadBhv(const Head & head): + whead(head.watch([this] (const Head & cur) { + BhvCurTime ctime; + whead = cur; + BhvImplBase::updated(ctime); + })) {} + + T get(const BhvCurTime &, const std::monostate &) const { return *whead; } + +private: + WatchedHead whead; +}; + template std::optional> Storage::head(UUID id) const { @@ -583,6 +603,12 @@ WatchedHead Head::watch(const std::function &)> & watch return WatchedHead(*this, wid); } +template +Bhv Head::behavior() const +{ + return make_shared>(*this); +} + template WatchedHead::~WatchedHead() { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 61f491f..8f65555 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories( add_library(erebos attach channel + frp identity message network diff --git a/src/frp.cpp b/src/frp.cpp new file mode 100644 index 0000000..142fcd4 --- /dev/null +++ b/src/frp.cpp @@ -0,0 +1,133 @@ +#include + +#include +#include + +using namespace erebos; + +using std::condition_variable; +using std::move; +using std::mutex; +using std::nullopt; +using std::unique_lock; +using std::weak_ptr; + +mutex bhvTimeMutex; +condition_variable bhvTimeCond; +bool bhvTimeRunning = false; +uint64_t bhvTimeLast = 0; + +BhvCurTime::BhvCurTime() +{ + unique_lock lock(bhvTimeMutex); + bhvTimeCond.wait(lock, []{ return !bhvTimeRunning; }); + + bhvTimeRunning = true; + t = BhvTime(++bhvTimeLast); +} + +BhvCurTime::~BhvCurTime() +{ + if (t) { + unique_lock lock(bhvTimeMutex); + bhvTimeRunning = false; + lock.unlock(); + bhvTimeCond.notify_one(); + } +} + +BhvCurTime::BhvCurTime(BhvCurTime && other) +{ + t = other.t; + other.t = nullopt; +} + +BhvCurTime & BhvCurTime::operator=(BhvCurTime && other) +{ + t = other.t; + other.t = nullopt; + return *this; +} + + +BhvImplBase::~BhvImplBase() = default; + +void BhvImplBase::dependsOn(shared_ptr other) +{ + depends.push_back(other); + other->rdepends.push_back(shared_from_this()); +} + +void BhvImplBase::updated(const BhvCurTime & ctime) +{ + vector> toUpdate; + markDirty(ctime, toUpdate); + + for (auto & bhv : toUpdate) + bhv->updateDirty(ctime); +} + +void BhvImplBase::markDirty(const BhvCurTime & ctime, vector> & toUpdate) +{ + if (dirty) + return; + + if (!needsUpdate(ctime)) + return; + + dirty = true; + toUpdate.push_back(shared_from_this()); + + bool prune = false; + for (const auto & w : rdepends) { + if (auto b = w.lock()) + b->markDirty(ctime, toUpdate); + else + prune = true; + } + + if (prune) { + decltype(rdepends) pruned; + for (const auto & w : rdepends) + if (!w.expired()) + pruned.push_back(move(w)); + rdepends = move(pruned); + } +} + +void BhvImplBase::updateDirty(const BhvCurTime & ctime) +{ + if (!dirty) + return; + + for (auto & d : depends) + d->updateDirty(ctime); + + doUpdate(ctime); + dirty = false; + + bool prune = false; + for (const auto & wcb : watchers) { + if (auto cb = wcb.lock()) + (*cb)(ctime); + else + prune = true; + } + + if (prune) { + decltype(watchers) pruned; + for (const auto & w : watchers) + if (!w.expired()) + pruned.push_back(move(w)); + watchers = move(pruned); + } +} + +bool BhvImplBase::needsUpdate(const BhvCurTime &) const +{ + return true; +} + +void BhvImplBase::doUpdate(const BhvCurTime &) +{ +} diff --git a/src/identity.cpp b/src/identity.cpp index 374a872..a4c12f2 100644 --- a/src/identity.cpp +++ b/src/identity.cpp @@ -77,6 +77,16 @@ bool Identity::sameAs(const Identity & other) const other.p->data[0]->data->keyIdentity; } +bool Identity::operator==(const Identity & other) const +{ + return p->data == other.p->data; +} + +bool Identity::operator!=(const Identity & other) const +{ + return p->data != other.p->data; +} + optional Identity::ref() const { if (p->data.size() == 1) diff --git a/src/network.cpp b/src/network.cpp index f33c097..259ae5e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -38,6 +38,11 @@ const Head & Server::localHead() const return p->localHead; } +const Bhv & Server::localState() const +{ + return p->localState; +} + const Identity & Server::identity() const { return p->self; @@ -203,6 +208,7 @@ Server::Priv::Priv(const Head & local, const Identity & self, vector> && svcs): self(self), // Watching needs to start after self is initialized + localState(local.behavior()), localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))), services(std::move(svcs)) { diff --git a/src/network.h b/src/network.h index 6ebd60c..fe7d7b4 100644 --- a/src/network.h +++ b/src/network.h @@ -158,6 +158,7 @@ struct Server::Priv : enable_shared_from_this bool finish = false; Identity self; + Bhv localState; WatchedHead localHead; vector> services; -- cgit v1.2.3