summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/frp.cpp133
-rw-r--r--src/identity.cpp10
-rw-r--r--src/network.cpp6
-rw-r--r--src/network.h1
5 files changed, 151 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 61f491f..8f65555 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,7 @@ include_directories(
add_library(erebos
attach
channel
+ frp
identity
message
network
diff --git a/src/frp.cpp b/src/frp.cpp
new file mode 100644
index 0000000..142fcd4
--- /dev/null
+++ b/src/frp.cpp
@@ -0,0 +1,133 @@
+#include <erebos/frp.h>
+
+#include <condition_variable>
+#include <mutex>
+
+using namespace erebos;
+
+using std::condition_variable;
+using std::move;
+using std::mutex;
+using std::nullopt;
+using std::unique_lock;
+using std::weak_ptr;
+
+mutex bhvTimeMutex;
+condition_variable bhvTimeCond;
+bool bhvTimeRunning = false;
+uint64_t bhvTimeLast = 0;
+
+BhvCurTime::BhvCurTime()
+{
+ unique_lock lock(bhvTimeMutex);
+ bhvTimeCond.wait(lock, []{ return !bhvTimeRunning; });
+
+ bhvTimeRunning = true;
+ t = BhvTime(++bhvTimeLast);
+}
+
+BhvCurTime::~BhvCurTime()
+{
+ if (t) {
+ unique_lock lock(bhvTimeMutex);
+ bhvTimeRunning = false;
+ lock.unlock();
+ bhvTimeCond.notify_one();
+ }
+}
+
+BhvCurTime::BhvCurTime(BhvCurTime && other)
+{
+ t = other.t;
+ other.t = nullopt;
+}
+
+BhvCurTime & BhvCurTime::operator=(BhvCurTime && other)
+{
+ t = other.t;
+ other.t = nullopt;
+ return *this;
+}
+
+
+BhvImplBase::~BhvImplBase() = default;
+
+void BhvImplBase::dependsOn(shared_ptr<BhvImplBase> other)
+{
+ depends.push_back(other);
+ other->rdepends.push_back(shared_from_this());
+}
+
+void BhvImplBase::updated(const BhvCurTime & ctime)
+{
+ vector<shared_ptr<BhvImplBase>> toUpdate;
+ markDirty(ctime, toUpdate);
+
+ for (auto & bhv : toUpdate)
+ bhv->updateDirty(ctime);
+}
+
+void BhvImplBase::markDirty(const BhvCurTime & ctime, vector<shared_ptr<BhvImplBase>> & toUpdate)
+{
+ if (dirty)
+ return;
+
+ if (!needsUpdate(ctime))
+ return;
+
+ dirty = true;
+ toUpdate.push_back(shared_from_this());
+
+ bool prune = false;
+ for (const auto & w : rdepends) {
+ if (auto b = w.lock())
+ b->markDirty(ctime, toUpdate);
+ else
+ prune = true;
+ }
+
+ if (prune) {
+ decltype(rdepends) pruned;
+ for (const auto & w : rdepends)
+ if (!w.expired())
+ pruned.push_back(move(w));
+ rdepends = move(pruned);
+ }
+}
+
+void BhvImplBase::updateDirty(const BhvCurTime & ctime)
+{
+ if (!dirty)
+ return;
+
+ for (auto & d : depends)
+ d->updateDirty(ctime);
+
+ doUpdate(ctime);
+ dirty = false;
+
+ bool prune = false;
+ for (const auto & wcb : watchers) {
+ if (auto cb = wcb.lock())
+ (*cb)(ctime);
+ else
+ prune = true;
+ }
+
+ if (prune) {
+ decltype(watchers) pruned;
+ for (const auto & w : watchers)
+ if (!w.expired())
+ pruned.push_back(move(w));
+ watchers = move(pruned);
+ }
+}
+
+bool BhvImplBase::needsUpdate(const BhvCurTime &) const
+{
+ return true;
+}
+
+void BhvImplBase::doUpdate(const BhvCurTime &)
+{
+}
diff --git a/src/identity.cpp b/src/identity.cpp
index 374a872..a4c12f2 100644
--- a/src/identity.cpp
+++ b/src/identity.cpp
@@ -77,6 +77,16 @@ bool Identity::sameAs(const Identity & other) const
other.p->data[0]->data->keyIdentity;
}
+bool Identity::operator==(const Identity & other) const
+{
+ return p->data == other.p->data;
+}
+
+bool Identity::operator!=(const Identity & other) const
+{
+ return p->data != other.p->data;
+}
+
optional<Ref> Identity::ref() const
{
if (p->data.size() == 1)
diff --git a/src/network.cpp b/src/network.cpp
index f33c097..259ae5e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -38,6 +38,11 @@ const Head<LocalState> & Server::localHead() const
return p->localHead;
}
+const Bhv<LocalState> & Server::localState() const
+{
+ return p->localState;
+}
+
const Identity & Server::identity() const
{
return p->self;
@@ -203,6 +208,7 @@ Server::Priv::Priv(const Head<LocalState> & local, const Identity & self,
vector<unique_ptr<Service>> && svcs):
self(self),
// Watching needs to start after self is initialized
+ localState(local.behavior()),
localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))),
services(std::move(svcs))
{
diff --git a/src/network.h b/src/network.h
index 6ebd60c..fe7d7b4 100644
--- a/src/network.h
+++ b/src/network.h
@@ -158,6 +158,7 @@ struct Server::Priv : enable_shared_from_this<Server::Priv>
bool finish = false;
Identity self;
+ Bhv<LocalState> localState;
WatchedHead<LocalState> localHead;
vector<unique_ptr<Service>> services;