summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/identity.h2
-rw-r--r--include/erebos/message.h90
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/identity.cpp7
-rw-r--r--src/message.cpp273
-rw-r--r--src/message.h57
6 files changed, 430 insertions, 0 deletions
diff --git a/include/erebos/identity.h b/include/erebos/identity.h
index dce6093..7ba3f29 100644
--- a/include/erebos/identity.h
+++ b/include/erebos/identity.h
@@ -16,6 +16,8 @@ public:
Stored<class PublicKey> keyMessage() const;
+ bool sameAs(const Identity &) const;
+
std::optional<Ref> ref() const;
class Builder
diff --git a/include/erebos/message.h b/include/erebos/message.h
new file mode 100644
index 0000000..6415f92
--- /dev/null
+++ b/include/erebos/message.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include <erebos/service.h>
+
+#include <chrono>
+#include <functional>
+#include <optional>
+#include <string>
+
+namespace erebos {
+
+class Identity;
+
+class DirectMessage
+{
+public:
+ const Identity & from() const;
+ const ZonedTime & time() const;
+ const std::string & text() const;
+
+private:
+ friend class DirectMessageThread;
+ friend class DirectMessageService;
+ struct Priv;
+ DirectMessage(Priv *);
+ std::shared_ptr<Priv> p;
+};
+
+class DirectMessageThread
+{
+public:
+ class Iterator
+ {
+ struct Priv;
+ Iterator(Priv *);
+ public:
+ using iterator_category = std::forward_iterator_tag;
+ using value_type = DirectMessage;
+ using difference_type = ssize_t;
+ using pointer = const DirectMessage *;
+ using reference = const DirectMessage &;
+
+ Iterator(const Iterator &);
+ ~Iterator();
+ Iterator & operator=(const Iterator &);
+ Iterator & operator++();
+ value_type operator*() const;
+ bool operator==(const Iterator &) const;
+ bool operator!=(const Iterator &) const;
+
+ private:
+ friend DirectMessageThread;
+ std::unique_ptr<Priv> p;
+ };
+
+ Iterator begin() const;
+ Iterator end() const;
+
+ size_t size() const;
+ DirectMessage at(size_t) const;
+
+ const Identity & peer() const;
+
+private:
+ friend class DirectMessageService;
+ struct Priv;
+ DirectMessageThread(Priv *);
+ std::shared_ptr<Priv> p;
+};
+
+class DirectMessageService : public Service
+{
+public:
+ DirectMessageService();
+ virtual ~DirectMessageService();
+
+ UUID uuid() const override;
+ void handle(Context &) const override;
+
+ typedef std::function<void(const DirectMessageThread &, ssize_t, ssize_t)> ThreadWatcher;
+ static void onUpdate(ThreadWatcher);
+ static DirectMessageThread thread(const Identity &);
+
+ static DirectMessage send(const Identity &, const Peer &, const std::string &);
+
+private:
+ struct Priv;
+};
+
+}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 2bbc7d0..d6d5441 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,7 @@ include_directories(
add_library(erebos
channel
identity
+ message
network
pubkey
service
diff --git a/src/identity.cpp b/src/identity.cpp
index 8f606ae..3cba97c 100644
--- a/src/identity.cpp
+++ b/src/identity.cpp
@@ -58,6 +58,13 @@ Stored<PublicKey> Identity::keyMessage() const
return p->keyMessage;
}
+bool Identity::sameAs(const Identity & other) const
+{
+ // TODO: proper identity check
+ return p->data[0]->data->keyIdentity ==
+ other.p->data[0]->data->keyIdentity;
+}
+
optional<Ref> Identity::ref() const
{
if (p->data.size() == 1)
diff --git a/src/message.cpp b/src/message.cpp
new file mode 100644
index 0000000..0fcb587
--- /dev/null
+++ b/src/message.cpp
@@ -0,0 +1,273 @@
+#include "message.h"
+
+#include <erebos/network.h>
+
+using namespace erebos;
+using std::nullopt;
+using std::scoped_lock;
+using std::unique_lock;
+
+static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
+
+static vector<DirectMessageThread> threadList;
+static mutex threadLock;
+
+DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer)
+{
+ for (const auto & t : threadList)
+ if (t.p->peer.sameAs(peer))
+ return t;
+
+ DirectMessageThread t(new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = {},
+ });
+ threadList.push_back(t);
+ return t;
+}
+
+DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head)
+{
+ DirectMessageThread nt(new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = std::move(head),
+ });
+
+ for (auto & t : threadList)
+ if (t.p->peer.sameAs(peer)) {
+ t = nt;
+ return nt;
+ }
+
+ threadList.push_back(nt);
+ return nt;
+}
+
+
+DirectMessage::DirectMessage(Priv * p):
+ p(p)
+{}
+
+optional<DirectMessageData> DirectMessageData::load(const Ref & ref)
+{
+ auto rec = ref->asRecord();
+ if (!rec)
+ return nullopt;
+
+ vector<Stored<DirectMessageData>> prev;
+ for (auto p : rec->items("PREV"))
+ if (const auto & x = p.as<DirectMessageData>())
+ prev.push_back(*x);
+
+ auto fref = rec->item("from").asRef();
+ if (!fref)
+ return nullopt;
+
+ auto from = Identity::load(*fref);
+ if (!from)
+ return nullopt;
+
+ return DirectMessageData {
+ .prev = std::move(prev),
+ .from = *from,
+ .time = *rec->item("time").asDate(),
+ .text = rec->item("text").asText().value(),
+ };
+}
+
+Ref DirectMessageData::store(const Storage & st) const
+{
+ vector<Record::Item> items;
+
+ for (const auto prev : prev)
+ items.emplace_back("PREV", prev.ref());
+ items.emplace_back("from", from.ref().value());
+ items.emplace_back("time", time);
+ items.emplace_back("text", text);
+
+ return st.storeObject(Record(std::move(items)));
+}
+
+
+const Identity & DirectMessage::from() const
+{
+ return p->data->from;
+}
+
+const ZonedTime & DirectMessage::time() const
+{
+ return p->data->time;
+}
+
+const string & DirectMessage::text() const
+{
+ return p->data->text;
+}
+
+
+DirectMessageThread::DirectMessageThread(Priv * p):
+ p(p)
+{}
+
+DirectMessageThread::Iterator::Iterator(Priv * p):
+ p(p)
+{}
+
+DirectMessageThread::Iterator::Iterator(const Iterator & other):
+ Iterator(new Priv(*other.p))
+{}
+
+DirectMessageThread::Iterator::~Iterator() = default;
+
+DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator=(const Iterator & other)
+{
+ p.reset(new Priv(*other.p));
+ return *this;
+}
+
+DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator++()
+{
+ if (p->current)
+ for (const auto & m : p->current->p->data->prev)
+ p->next.push_back(m);
+
+ if (p->next.empty()) {
+ p->current.reset();
+ } else {
+ filterAncestors(p->next);
+ auto ncur = p->next[0];
+
+ for (const auto & m : p->next)
+ if (m->time.time >= ncur->time.time)
+ ncur = m;
+
+ p->current.emplace(DirectMessage(new DirectMessage::Priv {
+ .data = ncur,
+ }));
+
+ p->next.erase(std::remove(p->next.begin(), p->next.end(), p->current->p->data));
+ }
+
+ return *this;
+}
+
+DirectMessage DirectMessageThread::Iterator::operator*() const
+{
+ return *p->current;
+}
+
+bool DirectMessageThread::Iterator::operator==(const Iterator & other) const
+{
+ if (p->current && other.p->current)
+ return p->current->p->data == other.p->current->p->data;
+ return bool(p->current) == bool(other.p->current);
+}
+
+bool DirectMessageThread::Iterator::operator!=(const Iterator & other) const
+{
+ return !(*this == other);
+}
+
+DirectMessageThread::Iterator DirectMessageThread::begin() const
+{
+ return ++Iterator(new Iterator::Priv {
+ .current = {},
+ .next = p->head,
+ });
+}
+
+DirectMessageThread::Iterator DirectMessageThread::end() const
+{
+ return Iterator(new Iterator::Priv {
+ .current = {},
+ .next = {},
+ });
+}
+
+size_t DirectMessageThread::size() const
+{
+ size_t c = 0;
+ for (auto it = begin(); it != end(); ++it)
+ c++;
+ return c;
+}
+
+DirectMessage DirectMessageThread::at(size_t i) const
+{
+ return *std::next(begin(), i);
+}
+
+const Identity & DirectMessageThread::peer() const
+{
+ return p->peer;
+}
+
+
+vector<DirectMessageService::ThreadWatcher> DirectMessageService::Priv::watchers;
+mutex DirectMessageService::Priv::watcherLock;
+
+DirectMessageService::DirectMessageService() = default;
+DirectMessageService::~DirectMessageService() = default;
+
+UUID DirectMessageService::uuid() const
+{
+ return myUUID;
+}
+
+void DirectMessageService::handle(Context & ctx) const
+{
+ auto dm = Stored<DirectMessageData>::load(ctx.ref());
+ if (!dm)
+ return;
+
+ auto pid = ctx.peer().identity();
+ if (!pid)
+ return;
+
+ unique_lock lock(threadLock);
+
+ vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(*pid).p->head);
+ head.push_back(*dm);
+ filterAncestors(head);
+ auto dmt = DirectMessageThread::Priv::updateThreadLocked(*pid, std::move(head));
+
+ lock.unlock();
+
+ for (const auto & w : Priv::watchers)
+ w(dmt, -1, -1);
+}
+
+void DirectMessageService::onUpdate(ThreadWatcher w)
+{
+ scoped_lock l(Priv::watcherLock);
+ Priv::watchers.push_back(w);
+}
+
+DirectMessageThread DirectMessageService::thread(const Identity & peer)
+{
+ scoped_lock lock(threadLock);
+ return DirectMessageThread::Priv::getThreadLocked(peer);
+}
+
+DirectMessage DirectMessageService::send(const Identity & from, const Peer & peer, const string & text)
+{
+ auto pid = peer.identity();
+ if (!pid)
+ throw std::runtime_error("Peer without known identity");
+
+ scoped_lock lock(threadLock);
+
+ auto msg = from.ref()->storage().store(DirectMessageData {
+ .prev = DirectMessageThread::Priv::getThreadLocked(*pid).p->head,
+ .from = from,
+ .time = ZonedTime::now(),
+ .text = text,
+ });
+
+ DirectMessageThread::Priv::updateThreadLocked(*pid, { msg });
+ peer.send(myUUID, msg.ref());
+
+ return DirectMessage(new DirectMessage::Priv {
+ .data = msg,
+ });
+}
diff --git a/src/message.h b/src/message.h
new file mode 100644
index 0000000..48487ba
--- /dev/null
+++ b/src/message.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <erebos/identity.h>
+#include <erebos/message.h>
+#include <erebos/storage.h>
+
+#include <mutex>
+#include <vector>
+
+namespace chrono = std::chrono;
+using chrono::system_clock;
+using std::mutex;
+using std::optional;
+using std::string;
+using std::vector;
+
+namespace erebos {
+
+struct DirectMessageData
+{
+ static optional<DirectMessageData> load(const Ref &);
+ Ref store(const Storage &) const;
+
+ vector<Stored<DirectMessageData>> prev;
+ Identity from;
+ ZonedTime time;
+ string text;
+};
+
+struct DirectMessage::Priv
+{
+ Stored<DirectMessageData> data;
+};
+
+struct DirectMessageThread::Priv
+{
+ const Identity peer;
+ const vector<Stored<DirectMessageData>> head;
+
+ static DirectMessageThread getThreadLocked(const Identity & peer);
+ static DirectMessageThread updateThreadLocked(const Identity & peer,
+ vector<Stored<DirectMessageData>> && head);
+};
+
+struct DirectMessageThread::Iterator::Priv
+{
+ optional<DirectMessage> current;
+ vector<Stored<DirectMessageData>> next;
+};
+
+struct DirectMessageService::Priv
+{
+ static vector<ThreadWatcher> watchers;
+ static mutex watcherLock;
+};
+
+}