summaryrefslogtreecommitdiff
path: root/src/message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/message.cpp')
-rw-r--r--src/message.cpp273
1 files changed, 273 insertions, 0 deletions
diff --git a/src/message.cpp b/src/message.cpp
new file mode 100644
index 0000000..0fcb587
--- /dev/null
+++ b/src/message.cpp
@@ -0,0 +1,273 @@
+#include "message.h"
+
+#include <erebos/network.h>
+
+using namespace erebos;
+using std::nullopt;
+using std::scoped_lock;
+using std::unique_lock;
+
+static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
+
+static vector<DirectMessageThread> threadList;
+static mutex threadLock;
+
+DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer)
+{
+ for (const auto & t : threadList)
+ if (t.p->peer.sameAs(peer))
+ return t;
+
+ DirectMessageThread t(new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = {},
+ });
+ threadList.push_back(t);
+ return t;
+}
+
+DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head)
+{
+ DirectMessageThread nt(new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = std::move(head),
+ });
+
+ for (auto & t : threadList)
+ if (t.p->peer.sameAs(peer)) {
+ t = nt;
+ return nt;
+ }
+
+ threadList.push_back(nt);
+ return nt;
+}
+
+
+DirectMessage::DirectMessage(Priv * p):
+ p(p)
+{}
+
+optional<DirectMessageData> DirectMessageData::load(const Ref & ref)
+{
+ auto rec = ref->asRecord();
+ if (!rec)
+ return nullopt;
+
+ vector<Stored<DirectMessageData>> prev;
+ for (auto p : rec->items("PREV"))
+ if (const auto & x = p.as<DirectMessageData>())
+ prev.push_back(*x);
+
+ auto fref = rec->item("from").asRef();
+ if (!fref)
+ return nullopt;
+
+ auto from = Identity::load(*fref);
+ if (!from)
+ return nullopt;
+
+ return DirectMessageData {
+ .prev = std::move(prev),
+ .from = *from,
+ .time = *rec->item("time").asDate(),
+ .text = rec->item("text").asText().value(),
+ };
+}
+
+Ref DirectMessageData::store(const Storage & st) const
+{
+ vector<Record::Item> items;
+
+ for (const auto prev : prev)
+ items.emplace_back("PREV", prev.ref());
+ items.emplace_back("from", from.ref().value());
+ items.emplace_back("time", time);
+ items.emplace_back("text", text);
+
+ return st.storeObject(Record(std::move(items)));
+}
+
+
+const Identity & DirectMessage::from() const
+{
+ return p->data->from;
+}
+
+const ZonedTime & DirectMessage::time() const
+{
+ return p->data->time;
+}
+
+const string & DirectMessage::text() const
+{
+ return p->data->text;
+}
+
+
+DirectMessageThread::DirectMessageThread(Priv * p):
+ p(p)
+{}
+
+DirectMessageThread::Iterator::Iterator(Priv * p):
+ p(p)
+{}
+
+DirectMessageThread::Iterator::Iterator(const Iterator & other):
+ Iterator(new Priv(*other.p))
+{}
+
+DirectMessageThread::Iterator::~Iterator() = default;
+
+DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator=(const Iterator & other)
+{
+ p.reset(new Priv(*other.p));
+ return *this;
+}
+
+DirectMessageThread::Iterator & DirectMessageThread::Iterator::operator++()
+{
+ if (p->current)
+ for (const auto & m : p->current->p->data->prev)
+ p->next.push_back(m);
+
+ if (p->next.empty()) {
+ p->current.reset();
+ } else {
+ filterAncestors(p->next);
+ auto ncur = p->next[0];
+
+ for (const auto & m : p->next)
+ if (m->time.time >= ncur->time.time)
+ ncur = m;
+
+ p->current.emplace(DirectMessage(new DirectMessage::Priv {
+ .data = ncur,
+ }));
+
+ p->next.erase(std::remove(p->next.begin(), p->next.end(), p->current->p->data));
+ }
+
+ return *this;
+}
+
+DirectMessage DirectMessageThread::Iterator::operator*() const
+{
+ return *p->current;
+}
+
+bool DirectMessageThread::Iterator::operator==(const Iterator & other) const
+{
+ if (p->current && other.p->current)
+ return p->current->p->data == other.p->current->p->data;
+ return bool(p->current) == bool(other.p->current);
+}
+
+bool DirectMessageThread::Iterator::operator!=(const Iterator & other) const
+{
+ return !(*this == other);
+}
+
+DirectMessageThread::Iterator DirectMessageThread::begin() const
+{
+ return ++Iterator(new Iterator::Priv {
+ .current = {},
+ .next = p->head,
+ });
+}
+
+DirectMessageThread::Iterator DirectMessageThread::end() const
+{
+ return Iterator(new Iterator::Priv {
+ .current = {},
+ .next = {},
+ });
+}
+
+size_t DirectMessageThread::size() const
+{
+ size_t c = 0;
+ for (auto it = begin(); it != end(); ++it)
+ c++;
+ return c;
+}
+
+DirectMessage DirectMessageThread::at(size_t i) const
+{
+ return *std::next(begin(), i);
+}
+
+const Identity & DirectMessageThread::peer() const
+{
+ return p->peer;
+}
+
+
+vector<DirectMessageService::ThreadWatcher> DirectMessageService::Priv::watchers;
+mutex DirectMessageService::Priv::watcherLock;
+
+DirectMessageService::DirectMessageService() = default;
+DirectMessageService::~DirectMessageService() = default;
+
+UUID DirectMessageService::uuid() const
+{
+ return myUUID;
+}
+
+void DirectMessageService::handle(Context & ctx) const
+{
+ auto dm = Stored<DirectMessageData>::load(ctx.ref());
+ if (!dm)
+ return;
+
+ auto pid = ctx.peer().identity();
+ if (!pid)
+ return;
+
+ unique_lock lock(threadLock);
+
+ vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(*pid).p->head);
+ head.push_back(*dm);
+ filterAncestors(head);
+ auto dmt = DirectMessageThread::Priv::updateThreadLocked(*pid, std::move(head));
+
+ lock.unlock();
+
+ for (const auto & w : Priv::watchers)
+ w(dmt, -1, -1);
+}
+
+void DirectMessageService::onUpdate(ThreadWatcher w)
+{
+ scoped_lock l(Priv::watcherLock);
+ Priv::watchers.push_back(w);
+}
+
+DirectMessageThread DirectMessageService::thread(const Identity & peer)
+{
+ scoped_lock lock(threadLock);
+ return DirectMessageThread::Priv::getThreadLocked(peer);
+}
+
+DirectMessage DirectMessageService::send(const Identity & from, const Peer & peer, const string & text)
+{
+ auto pid = peer.identity();
+ if (!pid)
+ throw std::runtime_error("Peer without known identity");
+
+ scoped_lock lock(threadLock);
+
+ auto msg = from.ref()->storage().store(DirectMessageData {
+ .prev = DirectMessageThread::Priv::getThreadLocked(*pid).p->head,
+ .from = from,
+ .time = ZonedTime::now(),
+ .text = text,
+ });
+
+ DirectMessageThread::Priv::updateThreadLocked(*pid, { msg });
+ peer.send(myUUID, msg.ref());
+
+ return DirectMessage(new DirectMessage::Priv {
+ .data = msg,
+ });
+}