summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2021-04-18 22:25:17 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2021-04-18 22:28:59 +0200
commit9aaba1211c95dc7e08437a7cca73452181e296d6 (patch)
treeb11cb72c1515fb7b284e98708d6b6f4a8fbae3ab
parentc3d6046b25ef0786b8d2919dfa9db4eb05114501 (diff)
Initial support for FRP behaviors
-rw-r--r--include/erebos/frp.h230
-rw-r--r--include/erebos/identity.h2
-rw-r--r--include/erebos/network.h1
-rw-r--r--include/erebos/storage.h26
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/frp.cpp133
-rw-r--r--src/identity.cpp10
-rw-r--r--src/network.cpp6
-rw-r--r--src/network.h1
9 files changed, 410 insertions, 0 deletions
diff --git a/include/erebos/frp.h b/include/erebos/frp.h
new file mode 100644
index 0000000..587d2b6
--- /dev/null
+++ b/include/erebos/frp.h
@@ -0,0 +1,230 @@
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <functional>
+#include <type_traits>
+#include <tuple>
+#include <variant>
+
+namespace erebos {
+
+using std::enable_if_t;
+using std::function;
+using std::is_same_v;
+using std::make_shared;
+using std::monostate;
+using std::optional;
+using std::shared_ptr;
+using std::static_pointer_cast;
+using std::tuple;
+using std::vector;
+using std::weak_ptr;
+
+class BhvTime
+{
+ BhvTime(uint64_t t): t(t) {}
+ friend class BhvCurTime;
+public:
+
+private:
+ uint64_t t;
+};
+
+class BhvCurTime
+{
+public:
+ BhvCurTime();
+ ~BhvCurTime();
+ BhvCurTime(const BhvCurTime &) = delete;
+ BhvCurTime(BhvCurTime &&);
+
+ BhvCurTime & operator=(const BhvCurTime &) = delete;
+ BhvCurTime & operator=(BhvCurTime &&);
+
+private:
+ optional<BhvTime> t;
+};
+
+template<typename T>
+class Watched
+{
+public:
+ Watched(shared_ptr<function<void(const BhvCurTime &)>> && cb):
+ cb(move(cb)) {}
+ ~Watched();
+
+private:
+ shared_ptr<function<void(const BhvCurTime &)>> cb;
+};
+
+template<typename T>
+Watched<T>::~Watched()
+{
+ BhvCurTime ctime;
+ cb.reset();
+}
+
+class BhvImplBase : public std::enable_shared_from_this<BhvImplBase>
+{
+public:
+ virtual ~BhvImplBase();
+
+protected:
+ void dependsOn(shared_ptr<BhvImplBase> other);
+ void updated(const BhvCurTime &);
+ virtual bool needsUpdate(const BhvCurTime &) const;
+ virtual void doUpdate(const BhvCurTime &);
+
+ vector<weak_ptr<function<void(const BhvCurTime &)>>> watchers;
+private:
+ void markDirty(const BhvCurTime &, vector<shared_ptr<BhvImplBase>> &);
+ void updateDirty(const BhvCurTime &);
+
+ bool dirty = false;
+ vector<shared_ptr<BhvImplBase>> depends;
+ vector<weak_ptr<BhvImplBase>> rdepends;
+
+ template<typename A, typename B> friend class BhvFun;
+};
+
+template<typename A, typename B>
+class BhvImpl : public BhvImplBase
+{
+public:
+ virtual B get(const BhvCurTime &, const A &) const = 0;
+};
+
+template<typename A>
+using BhvSource = BhvImpl<monostate, A>;
+
+template<typename A, typename B>
+class BhvFun
+{
+public:
+ BhvFun(shared_ptr<BhvImpl<A, B>> impl):
+ impl(move(impl)) {}
+
+ template<typename T> BhvFun(shared_ptr<T> impl):
+ BhvFun(static_pointer_cast<BhvImpl<A, B>>(impl)) {}
+
+ B get(const A & x) const
+ {
+ BhvCurTime ctime;
+ return impl->get(ctime, x);
+ }
+
+ const shared_ptr<BhvImpl<A, B>> impl;
+};
+
+template<typename A>
+class BhvFun<monostate, A>
+{
+public:
+ BhvFun(shared_ptr<BhvSource<A>> impl):
+ impl(move(impl)) {}
+
+ template<typename T> BhvFun(shared_ptr<T> impl):
+ BhvFun(static_pointer_cast<BhvSource<A>>(impl)) {}
+
+ A get() const
+ {
+ BhvCurTime ctime;
+ return impl->get(ctime, monostate());
+ }
+ Watched<A> watch(function<void(const A &)>);
+
+ const shared_ptr<BhvSource<A>> impl;
+};
+
+template<typename A>
+using Bhv = BhvFun<monostate, A>;
+
+template<typename A>
+Watched<A> Bhv<A>::watch(function<void(const A &)> f)
+{
+ auto cb = make_shared<function<void(const BhvCurTime &)>>(
+ [impl = BhvFun<monostate, A>::impl, f] (const BhvCurTime & ctime) {
+ f(impl->get(ctime, monostate()));
+ });
+ BhvFun<monostate, A>::impl->watchers.push_back(cb);
+ return Watched<A>(move(cb));
+}
+
+
+template<typename A, typename B>
+class BhvLambda : public BhvImpl<A, B>
+{
+public:
+ BhvLambda(function<B(const A &)> f): f(f) {}
+
+ B get(const BhvCurTime &, const A & x) const override
+ { return f(x); }
+
+private:
+ function<B(const A &)> f;
+};
+
+template<typename A, typename B>
+BhvFun<A, B> bfun(function<B(const A &)> f)
+{
+ return make_shared<BhvLambda<A, B>>(f);
+}
+
+
+template<typename A, typename B, typename C> class BhvComp;
+template<typename A, typename B, typename C>
+BhvFun<A, C> operator>>(const BhvFun<A, B> & f, const BhvFun<B, C> & g);
+
+template<typename A, typename B, typename C>
+class BhvComp : public BhvImpl<A, C>
+{
+public:
+ BhvComp(const BhvFun<A, B> & f, const BhvFun<B, C>):
+ f(f), g(g) {}
+
+ C get(const BhvCurTime & ctime, const A & x) const override
+ { return g.impl.get(ctime, f.impl.get(ctime, x)); }
+
+private:
+ BhvFun<A, B> f;
+ BhvFun<B, C> g;
+
+ friend BhvFun<A, C> operator>> <A, B, C>(const BhvFun<A, B> &, const BhvFun<B, C> &);
+};
+
+template<typename B, typename C>
+class BhvComp<monostate, B, C> : public BhvSource<C>
+{
+public:
+ BhvComp(const BhvFun<monostate, B> & f, const BhvFun<B, C> & g):
+ f(f), g(g) {}
+
+ bool needsUpdate(const BhvCurTime & ctime) const override
+ { return !x || g.impl->get(ctime, f.impl->get(ctime, monostate())) != x.value(); }
+
+ void doUpdate(const BhvCurTime & ctime) override
+ { x = g.impl->get(ctime, f.impl->get(ctime, monostate())); }
+
+ C get(const BhvCurTime & ctime, const monostate & m) const override
+ { return x ? x.value() : g.impl->get(ctime, f.impl->get(ctime, m)); }
+
+private:
+ BhvFun<monostate, B> f;
+ BhvFun<B, C> g;
+ optional<C> x;
+
+ friend BhvFun<monostate, C> operator>> <monostate, B, C>(const BhvFun<monostate, B> &, const BhvFun<B, C> &);
+};
+
+template<typename A, typename B, typename C>
+BhvFun<A, C> operator>>(const BhvFun<A, B> & f, const BhvFun<B, C> & g)
+{
+ auto impl = make_shared<BhvComp<A, B, C>>(f, g);
+ impl->dependsOn(f.impl);
+ impl->dependsOn(g.impl);
+ return impl;
+}
+
+}
diff --git a/include/erebos/identity.h b/include/erebos/identity.h
index aeddd08..f7b17b8 100644
--- a/include/erebos/identity.h
+++ b/include/erebos/identity.h
@@ -24,6 +24,8 @@ public:
Stored<class PublicKey> keyMessage() const;
bool sameAs(const Identity &) const;
+ bool operator==(const Identity & other) const;
+ bool operator!=(const Identity & other) const;
std::optional<Ref> ref() const;
diff --git a/include/erebos/network.h b/include/erebos/network.h
index 5a1fca6..f2ae191 100644
--- a/include/erebos/network.h
+++ b/include/erebos/network.h
@@ -17,6 +17,7 @@ public:
~Server();
const Head<LocalState> & localHead() const;
+ const Bhv<LocalState> & localState() const;
const Identity & identity() const;
template<class S> S & svc();
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 7ec73ab..2d00cc3 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -1,5 +1,6 @@
#pragma once
+#include <erebos/frp.h>
#include <erebos/time.h>
#include <erebos/uuid.h>
@@ -496,6 +497,8 @@ public:
std::optional<Head<T>> update(const std::function<Stored<T>(const Stored<T> &)> &) const;
WatchedHead<T> watch(const std::function<void(const Head<T> &)> &) const;
+ Bhv<T> behavior() const;
+
private:
UUID mid;
Stored<T> mstored;
@@ -526,6 +529,23 @@ public:
~WatchedHead();
};
+template<class T>
+class HeadBhv : public BhvSource<T>
+{
+public:
+ HeadBhv(const Head<T> & head):
+ whead(head.watch([this] (const Head<T> & cur) {
+ BhvCurTime ctime;
+ whead = cur;
+ BhvImplBase::updated(ctime);
+ })) {}
+
+ T get(const BhvCurTime &, const std::monostate &) const { return *whead; }
+
+private:
+ WatchedHead<T> whead;
+};
+
template<typename T>
std::optional<Head<T>> Storage::head(UUID id) const
{
@@ -583,6 +603,12 @@ WatchedHead<T> Head<T>::watch(const std::function<void(const Head<T> &)> & watch
return WatchedHead<T>(*this, wid);
}
+template<typename T>
+Bhv<T> Head<T>::behavior() const
+{
+ return make_shared<HeadBhv<T>>(*this);
+}
+
template<class T>
WatchedHead<T>::~WatchedHead()
{
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 61f491f..8f65555 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,7 @@ include_directories(
add_library(erebos
attach
channel
+ frp
identity
message
network
diff --git a/src/frp.cpp b/src/frp.cpp
new file mode 100644
index 0000000..142fcd4
--- /dev/null
+++ b/src/frp.cpp
@@ -0,0 +1,133 @@
+#include <erebos/frp.h>
+
+#include <condition_variable>
+#include <mutex>
+
+using namespace erebos;
+
+using std::condition_variable;
+using std::move;
+using std::mutex;
+using std::nullopt;
+using std::unique_lock;
+using std::weak_ptr;
+
+mutex bhvTimeMutex;
+condition_variable bhvTimeCond;
+bool bhvTimeRunning = false;
+uint64_t bhvTimeLast = 0;
+
+BhvCurTime::BhvCurTime()
+{
+ unique_lock lock(bhvTimeMutex);
+ bhvTimeCond.wait(lock, []{ return !bhvTimeRunning; });
+
+ bhvTimeRunning = true;
+ t = BhvTime(++bhvTimeLast);
+}
+
+BhvCurTime::~BhvCurTime()
+{
+ if (t) {
+ unique_lock lock(bhvTimeMutex);
+ bhvTimeRunning = false;
+ lock.unlock();
+ bhvTimeCond.notify_one();
+ }
+}
+
+BhvCurTime::BhvCurTime(BhvCurTime && other)
+{
+ t = other.t;
+ other.t = nullopt;
+}
+
+BhvCurTime & BhvCurTime::operator=(BhvCurTime && other)
+{
+ t = other.t;
+ other.t = nullopt;
+ return *this;
+}
+
+
+BhvImplBase::~BhvImplBase() = default;
+
+void BhvImplBase::dependsOn(shared_ptr<BhvImplBase> other)
+{
+ depends.push_back(other);
+ other->rdepends.push_back(shared_from_this());
+}
+
+void BhvImplBase::updated(const BhvCurTime & ctime)
+{
+ vector<shared_ptr<BhvImplBase>> toUpdate;
+ markDirty(ctime, toUpdate);
+
+ for (auto & bhv : toUpdate)
+ bhv->updateDirty(ctime);
+}
+
+void BhvImplBase::markDirty(const BhvCurTime & ctime, vector<shared_ptr<BhvImplBase>> & toUpdate)
+{
+ if (dirty)
+ return;
+
+ if (!needsUpdate(ctime))
+ return;
+
+ dirty = true;
+ toUpdate.push_back(shared_from_this());
+
+ bool prune = false;
+ for (const auto & w : rdepends) {
+ if (auto b = w.lock())
+ b->markDirty(ctime, toUpdate);
+ else
+ prune = true;
+ }
+
+ if (prune) {
+ decltype(rdepends) pruned;
+ for (const auto & w : rdepends)
+ if (!w.expired())
+ pruned.push_back(move(w));
+ rdepends = move(pruned);
+ }
+}
+
+void BhvImplBase::updateDirty(const BhvCurTime & ctime)
+{
+ if (!dirty)
+ return;
+
+ for (auto & d : depends)
+ d->updateDirty(ctime);
+
+ doUpdate(ctime);
+ dirty = false;
+
+ bool prune = false;
+ for (const auto & wcb : watchers) {
+ if (auto cb = wcb.lock())
+ (*cb)(ctime);
+ else
+ prune = true;
+ }
+
+ if (prune) {
+ decltype(watchers) pruned;
+ for (const auto & w : watchers)
+ if (!w.expired())
+ pruned.push_back(move(w));
+ watchers = move(pruned);
+ }
+}
+
+bool BhvImplBase::needsUpdate(const BhvCurTime &) const
+{
+ return true;
+}
+
+void BhvImplBase::doUpdate(const BhvCurTime &)
+{
+}
diff --git a/src/identity.cpp b/src/identity.cpp
index 374a872..a4c12f2 100644
--- a/src/identity.cpp
+++ b/src/identity.cpp
@@ -77,6 +77,16 @@ bool Identity::sameAs(const Identity & other) const
other.p->data[0]->data->keyIdentity;
}
+bool Identity::operator==(const Identity & other) const
+{
+ return p->data == other.p->data;
+}
+
+bool Identity::operator!=(const Identity & other) const
+{
+ return p->data != other.p->data;
+}
+
optional<Ref> Identity::ref() const
{
if (p->data.size() == 1)
diff --git a/src/network.cpp b/src/network.cpp
index f33c097..259ae5e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -38,6 +38,11 @@ const Head<LocalState> & Server::localHead() const
return p->localHead;
}
+const Bhv<LocalState> & Server::localState() const
+{
+ return p->localState;
+}
+
const Identity & Server::identity() const
{
return p->self;
@@ -203,6 +208,7 @@ Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,
vector<unique_ptr<Service>> && svcs):
self(self),
// Watching needs to start after self is initialized
+ localState(local.behavior()),
localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))),
services(std::move(svcs))
{
diff --git a/src/network.h b/src/network.h
index 6ebd60c..fe7d7b4 100644
--- a/src/network.h
+++ b/src/network.h
@@ -158,6 +158,7 @@ struct Server::Priv : enable_shared_from_this<Server::Priv>
bool finish = false;
Identity self;
+ Bhv<LocalState> localState;
WatchedHead<LocalState> localHead;
vector<unique_ptr<Service>> services;