diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/frp.cpp | 133 | ||||
-rw-r--r-- | src/identity.cpp | 10 | ||||
-rw-r--r-- | src/network.cpp | 6 | ||||
-rw-r--r-- | src/network.h | 1 |
5 files changed, 151 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 61f491f..8f65555 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories( add_library(erebos attach channel + frp identity message network diff --git a/src/frp.cpp b/src/frp.cpp new file mode 100644 index 0000000..142fcd4 --- /dev/null +++ b/src/frp.cpp @@ -0,0 +1,133 @@ +#include <erebos/frp.h> + +#include <condition_variable> +#include <mutex> + +using namespace erebos; + +using std::condition_variable; +using std::move; +using std::mutex; +using std::nullopt; +using std::unique_lock; +using std::weak_ptr; + +mutex bhvTimeMutex; +condition_variable bhvTimeCond; +bool bhvTimeRunning = false; +uint64_t bhvTimeLast = 0; + +BhvCurTime::BhvCurTime() +{ + unique_lock lock(bhvTimeMutex); + bhvTimeCond.wait(lock, []{ return !bhvTimeRunning; }); + + bhvTimeRunning = true; + t = BhvTime(++bhvTimeLast); +} + +BhvCurTime::~BhvCurTime() +{ + if (t) { + unique_lock lock(bhvTimeMutex); + bhvTimeRunning = false; + lock.unlock(); + bhvTimeCond.notify_one(); + } +} + +BhvCurTime::BhvCurTime(BhvCurTime && other) +{ + t = other.t; + other.t = nullopt; +} + +BhvCurTime & BhvCurTime::operator=(BhvCurTime && other) +{ + t = other.t; + other.t = nullopt; + return *this; +} + + +BhvImplBase::~BhvImplBase() = default; + +void BhvImplBase::dependsOn(shared_ptr<BhvImplBase> other) +{ + depends.push_back(other); + other->rdepends.push_back(shared_from_this()); +} + +void BhvImplBase::updated(const BhvCurTime & ctime) +{ + vector<shared_ptr<BhvImplBase>> toUpdate; + markDirty(ctime, toUpdate); + + for (auto & bhv : toUpdate) + bhv->updateDirty(ctime); +} + +void BhvImplBase::markDirty(const BhvCurTime & ctime, vector<shared_ptr<BhvImplBase>> & toUpdate) +{ + if (dirty) + return; + + if (!needsUpdate(ctime)) + return; + + dirty = true; + toUpdate.push_back(shared_from_this()); + + bool prune = false; + for (const auto & w : rdepends) { + if (auto b = w.lock()) + b->markDirty(ctime, toUpdate); + else + prune = true; + } + + if (prune) { + decltype(rdepends) pruned; + for (const auto & w : rdepends) + if (!w.expired()) + pruned.push_back(move(w)); + rdepends = move(pruned); + } +} + +void BhvImplBase::updateDirty(const BhvCurTime & ctime) +{ + if (!dirty) + return; + + for (auto & d : depends) + d->updateDirty(ctime); + + doUpdate(ctime); + dirty = false; + + bool prune = false; + for (const auto & wcb : watchers) { + if (auto cb = wcb.lock()) + (*cb)(ctime); + else + prune = true; + } + + if (prune) { + decltype(watchers) pruned; + for (const auto & w : watchers) + if (!w.expired()) + pruned.push_back(move(w)); + watchers = move(pruned); + } +} + +bool BhvImplBase::needsUpdate(const BhvCurTime &) const +{ + return true; +} + +void BhvImplBase::doUpdate(const BhvCurTime &) +{ +} diff --git a/src/identity.cpp b/src/identity.cpp index 374a872..a4c12f2 100644 --- a/src/identity.cpp +++ b/src/identity.cpp @@ -77,6 +77,16 @@ bool Identity::sameAs(const Identity & other) const other.p->data[0]->data->keyIdentity; } +bool Identity::operator==(const Identity & other) const +{ + return p->data == other.p->data; +} + +bool Identity::operator!=(const Identity & other) const +{ + return p->data != other.p->data; +} + optional<Ref> Identity::ref() const { if (p->data.size() == 1) diff --git a/src/network.cpp b/src/network.cpp index f33c097..259ae5e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -38,6 +38,11 @@ const Head<LocalState> & Server::localHead() const return p->localHead; } +const Bhv<LocalState> & Server::localState() const +{ + return p->localState; +} + const Identity & Server::identity() const { return p->self; @@ -203,6 +208,7 @@ Server::Priv::Priv(const Head<LocalState> & local, const Identity & self, vector<unique_ptr<Service>> && svcs): self(self), // Watching needs to start after self is initialized + localState(local.behavior()), localHead(local.watch(std::bind(&Priv::handleLocalHeadChange, this, std::placeholders::_1))), services(std::move(svcs)) { diff --git a/src/network.h b/src/network.h index 6ebd60c..fe7d7b4 100644 --- a/src/network.h +++ b/src/network.h @@ -158,6 +158,7 @@ struct Server::Priv : enable_shared_from_this<Server::Priv> bool finish = false; Identity self; + Bhv<LocalState> localState; WatchedHead<LocalState> localHead; vector<unique_ptr<Service>> services; |