From 29ade9784fe65ecd686b5e8e18d84e6acc30b37a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 14 Mar 2020 22:43:44 +0100 Subject: Direct message service --- include/erebos/identity.h | 2 + include/erebos/message.h | 90 +++++++++++++++ src/CMakeLists.txt | 1 + src/identity.cpp | 7 ++ src/message.cpp | 273 ++++++++++++++++++++++++++++++++++++++++++++++ src/message.h | 57 ++++++++++ 6 files changed, 430 insertions(+) create mode 100644 include/erebos/message.h create mode 100644 src/message.cpp create mode 100644 src/message.h diff --git a/include/erebos/identity.h b/include/erebos/identity.h index dce6093..7ba3f29 100644 --- a/include/erebos/identity.h +++ b/include/erebos/identity.h @@ -16,6 +16,8 @@ public: Stored keyMessage() const; + bool sameAs(const Identity &) const; + std::optional ref() const; class Builder diff --git a/include/erebos/message.h b/include/erebos/message.h new file mode 100644 index 0000000..6415f92 --- /dev/null +++ b/include/erebos/message.h @@ -0,0 +1,90 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace erebos { + +class Identity; + +class DirectMessage +{ +public: + const Identity & from() const; + const ZonedTime & time() const; + const std::string & text() const; + +private: + friend class DirectMessageThread; + friend class DirectMessageService; + struct Priv; + DirectMessage(Priv *); + std::shared_ptr p; +}; + +class DirectMessageThread +{ +public: + class Iterator + { + struct Priv; + Iterator(Priv *); + public: + using iterator_category = std::forward_iterator_tag; + using value_type = DirectMessage; + using difference_type = ssize_t; + using pointer = const DirectMessage *; + using reference = const DirectMessage &; + + Iterator(const Iterator &); + ~Iterator(); + Iterator & operator=(const Iterator &); + Iterator & operator++(); + value_type operator*() const; + bool operator==(const Iterator &) const; + bool operator!=(const Iterator &) const; + + private: + friend DirectMessageThread; + std::unique_ptr p; + }; + + Iterator begin() const; + Iterator end() const; + + size_t size() const; + DirectMessage at(size_t) const; + + const Identity & peer() const; + +private: + friend class DirectMessageService; + struct Priv; + DirectMessageThread(Priv *); + std::shared_ptr p; +}; + +class DirectMessageService : public Service +{ +public: + DirectMessageService(); + virtual ~DirectMessageService(); + + UUID uuid() const override; + void handle(Context &) const override; + + typedef std::function ThreadWatcher; + static void onUpdate(ThreadWatcher); + static DirectMessageThread thread(const Identity &); + + static DirectMessage send(const Identity &, const Peer &, const std::string &); + +private: + struct Priv; +}; + +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2bbc7d0..d6d5441 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories( add_library(erebos channel identity + message network pubkey service diff --git a/src/identity.cpp b/src/identity.cpp index 8f606ae..3cba97c 100644 --- a/src/identity.cpp +++ b/src/identity.cpp @@ -58,6 +58,13 @@ Stored Identity::keyMessage() const return p->keyMessage; } +bool Identity::sameAs(const Identity & other) const +{ + // TODO: proper identity check + return p->data[0]->data->keyIdentity == + other.p->data[0]->data->keyIdentity; +} + optional Identity::ref() const { if (p->data.size() == 1) diff --git a/src/message.cpp b/src/message.cpp new file mode 100644 index 0000000..0fcb587 --- /dev/null +++ b/src/message.cpp @@ -0,0 +1,273 @@ +#include "message.h" + +#include + +using namespace erebos; +using std::nullopt; +using std::scoped_lock; +using std::unique_lock; + +static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); + +static vector threadList; +static mutex threadLock; + +DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) +{ + for (const auto & t : threadList) + if (t.p->peer.sameAs(peer)) + return t; + + DirectMessageThread t(new DirectMessageThread::Priv { + .peer = peer, + .head = {}, + }); + threadList.push_back(t); + return t; +} + +DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector> && head) +{ + DirectMessageThread nt(new DirectMessageThread::Priv { + .peer = peer, + .head = std::move(head), + }); + + for (auto & t : threadList) + if (t.p->peer.sameAs(peer)) { + t = nt; + return nt; + } + + threadList.push_back(nt); + return nt; +} + + +DirectMessage::DirectMessage(Priv * p): + p(p) +{} + +optional DirectMessageData::load(const Ref & ref) +{ + auto rec = ref->asRecord(); + if (!rec) + return nullopt; + + vector> prev; + for (auto p : rec->items("PREV")) + if (const auto & x = p.as()) + prev.push_back(*x); + + auto fref = rec->item("from").asRef(); + if (!fref) + return nullopt; + + auto from = Identity::load(*fref); + if (!from) + return nullopt; + + return DirectMessageData { + .prev = std::move(prev), + .from = *from, + .time = *rec->item("time").asDate(), + .text = rec->item("text").asText().value(), + }; +} + +Ref DirectMessageData::store(const Storage & st) const +{ + vector items; + + for (const auto prev : prev) + items.emplace_back("PREV", prev.ref()); + items.emplace_back("from", from.ref().value()); + items.emplace_back("time", time); + items.emplace_back("text", text); + + return st.storeObject(Record(std::move(items))); +} + + +const Identity & DirectMessage::from() const +{ + return p->data->from; +} + +const ZonedTime & DirectMessage::time() const +{ + return p->data->time; +} + +const string & DirectMessage::text() const +{ + return p->data->text; +} + + +DirectMessageThread::DirectMessageThread(Priv * p): + p(p) +{} + +DirectMessageThread::Iterator::Iterator(Priv * p): + p(p) +{} + +DirectMessageThread::Iterator::Iterator(const Iterator & other): + Iterator(new Priv(*other.p)) +{} + +DirectMessageThread::Iterator::~Iterator() = default; + +DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator=(const Iterator & other) +{ + p.reset(new Priv(*other.p)); + return *this; +} + +DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator++() +{ + if (p->current) + for (const auto & m : p->current->p->data->prev) + p->next.push_back(m); + + if (p->next.empty()) { + p->current.reset(); + } else { + filterAncestors(p->next); + auto ncur = p->next[0]; + + for (const auto & m : p->next) + if (m->time.time >= ncur->time.time) + ncur = m; + + p->current.emplace(DirectMessage(new DirectMessage::Priv { + .data = ncur, + })); + + p->next.erase(std::remove(p->next.begin(), p->next.end(), p->current->p->data)); + } + + return *this; +} + +DirectMessage DirectMessageThread::Iterator::operator*() const +{ + return *p->current; +} + +bool DirectMessageThread::Iterator::operator==(const Iterator & other) const +{ + if (p->current && other.p->current) + return p->current->p->data == other.p->current->p->data; + return bool(p->current) == bool(other.p->current); +} + +bool DirectMessageThread::Iterator::operator!=(const Iterator & other) const +{ + return !(*this == other); +} + +DirectMessageThread::Iterator DirectMessageThread::begin() const +{ + return ++Iterator(new Iterator::Priv { + .current = {}, + .next = p->head, + }); +} + +DirectMessageThread::Iterator DirectMessageThread::end() const +{ + return Iterator(new Iterator::Priv { + .current = {}, + .next = {}, + }); +} + +size_t DirectMessageThread::size() const +{ + size_t c = 0; + for (auto it = begin(); it != end(); ++it) + c++; + return c; +} + +DirectMessage DirectMessageThread::at(size_t i) const +{ + return *std::next(begin(), i); +} + +const Identity & DirectMessageThread::peer() const +{ + return p->peer; +} + + +vector DirectMessageService::Priv::watchers; +mutex DirectMessageService::Priv::watcherLock; + +DirectMessageService::DirectMessageService() = default; +DirectMessageService::~DirectMessageService() = default; + +UUID DirectMessageService::uuid() const +{ + return myUUID; +} + +void DirectMessageService::handle(Context & ctx) const +{ + auto dm = Stored::load(ctx.ref()); + if (!dm) + return; + + auto pid = ctx.peer().identity(); + if (!pid) + return; + + unique_lock lock(threadLock); + + vector> head(DirectMessageThread::Priv::getThreadLocked(*pid).p->head); + head.push_back(*dm); + filterAncestors(head); + auto dmt = DirectMessageThread::Priv::updateThreadLocked(*pid, std::move(head)); + + lock.unlock(); + + for (const auto & w : Priv::watchers) + w(dmt, -1, -1); +} + +void DirectMessageService::onUpdate(ThreadWatcher w) +{ + scoped_lock l(Priv::watcherLock); + Priv::watchers.push_back(w); +} + +DirectMessageThread DirectMessageService::thread(const Identity & peer) +{ + scoped_lock lock(threadLock); + return DirectMessageThread::Priv::getThreadLocked(peer); +} + +DirectMessage DirectMessageService::send(const Identity & from, const Peer & peer, const string & text) +{ + auto pid = peer.identity(); + if (!pid) + throw std::runtime_error("Peer without known identity"); + + scoped_lock lock(threadLock); + + auto msg = from.ref()->storage().store(DirectMessageData { + .prev = DirectMessageThread::Priv::getThreadLocked(*pid).p->head, + .from = from, + .time = ZonedTime::now(), + .text = text, + }); + + DirectMessageThread::Priv::updateThreadLocked(*pid, { msg }); + peer.send(myUUID, msg.ref()); + + return DirectMessage(new DirectMessage::Priv { + .data = msg, + }); +} diff --git a/src/message.h b/src/message.h new file mode 100644 index 0000000..48487ba --- /dev/null +++ b/src/message.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include + +#include +#include + +namespace chrono = std::chrono; +using chrono::system_clock; +using std::mutex; +using std::optional; +using std::string; +using std::vector; + +namespace erebos { + +struct DirectMessageData +{ + static optional load(const Ref &); + Ref store(const Storage &) const; + + vector> prev; + Identity from; + ZonedTime time; + string text; +}; + +struct DirectMessage::Priv +{ + Stored data; +}; + +struct DirectMessageThread::Priv +{ + const Identity peer; + const vector> head; + + static DirectMessageThread getThreadLocked(const Identity & peer); + static DirectMessageThread updateThreadLocked(const Identity & peer, + vector> && head); +}; + +struct DirectMessageThread::Iterator::Priv +{ + optional current; + vector> next; +}; + +struct DirectMessageService::Priv +{ + static vector watchers; + static mutex watcherLock; +}; + +} -- cgit v1.2.3