summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-07-01 20:08:48 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-07-01 22:07:27 +0200
commit9577599e6af4dfb36df81fe5d89801c4ce4a19ab (patch)
tree1d383f6f3466c3c2198b4de2a2f53b6ad500fe39
parentafab7dc5673fbc5fd600182612626676ae36d1c0 (diff)
Direct message shared state
-rw-r--r--include/erebos/message.h43
-rw-r--r--include/erebos/storage.h1
-rw-r--r--src/main.cpp35
-rw-r--r--src/message.cpp282
-rw-r--r--src/message.h17
-rw-r--r--test/message.test67
6 files changed, 377 insertions, 68 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h
index 011007a..194044c 100644
--- a/include/erebos/message.h
+++ b/include/erebos/message.h
@@ -1,19 +1,23 @@
#pragma once
+#include <erebos/merge.h>
#include <erebos/service.h>
#include <chrono>
#include <functional>
#include <memory>
+#include <mutex>
#include <optional>
#include <string>
namespace erebos {
+using std::mutex;
using std::unique_ptr;
class Contact;
class Identity;
+struct DirectMessageState;
class DirectMessage
{
@@ -67,11 +71,43 @@ public:
private:
friend class DirectMessageService;
+ friend class DirectMessageThreads;
struct Priv;
DirectMessageThread(Priv *);
std::shared_ptr<Priv> p;
};
+class DirectMessageThreads
+{
+public:
+ DirectMessageThreads();
+ DirectMessageThreads(Stored<DirectMessageState>);
+ DirectMessageThreads(vector<Stored<DirectMessageState>>);
+
+ static DirectMessageThreads load(const vector<Ref> & refs);
+ vector<Ref> store() const;
+ vector<Stored<DirectMessageState>> data() const;
+
+ bool operator==(const DirectMessageThreads &) const;
+ bool operator!=(const DirectMessageThreads &) const;
+
+ DirectMessageThread thread(const Identity &) const;
+
+private:
+ vector<Stored<DirectMessageState>> state;
+
+ friend class DirectMessageService;
+};
+
+DECLARE_SHARED_TYPE(DirectMessageThreads)
+
+template<> struct Mergeable<DirectMessageThreads>
+{
+ using Component = DirectMessageState;
+ static vector<Stored<DirectMessageState>> components(const DirectMessageThreads &);
+ static Contact merge(vector<Stored<DirectMessageState>>);
+};
+
class DirectMessageService : public Service
{
public:
@@ -100,8 +136,15 @@ public:
DirectMessage send(const Peer &, const std::string &);
private:
+ void updateHandler(const DirectMessageThreads &);
+
const Config config;
const Server & server;
+
+ vector<Stored<DirectMessageState>> prevState;
+ mutex stateMutex;
+
+ Watched<DirectMessageThreads> watched;
};
}
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 32049db..8a701e9 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -474,6 +474,7 @@ class Stored
friend class Storage;
friend class Head<T>;
public:
+ Stored() = default;
Stored(const Stored &) = default;
Stored(Stored &&) = default;
Stored & operator=(const Stored &) = default;
diff --git a/src/main.cpp b/src/main.cpp
index f9c5e98..b39130a 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -289,6 +289,9 @@ void startServer(const vector<string> &)
config.service<DirectMessageService>()
.onUpdate([](const DirectMessageThread & thread, ssize_t, ssize_t) {
+ if (thread.at(0).from()->sameAs(server->identity()))
+ return;
+
ostringstream ss;
string name = "<unnamed>";
@@ -550,6 +553,36 @@ void dmSendContact(const vector<string> & args)
args.at(1));
}
+template<class T>
+static void dmList(const T & peer)
+{
+ if (auto id = peer.identity())
+ for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) {
+ string name = "<unnamed>";
+ if (const auto & from = msg.from())
+ if (const auto & opt = from->name())
+ name = *opt;
+
+ ostringstream ss;
+ ss << "dm-list-item"
+ << " from " << name
+ << " text " << msg.text()
+ ;
+ printLine(ss.str());
+ }
+ printLine("dm-list-done");
+}
+
+void dmListPeer(const vector<string> & args)
+{
+ dmList(getPeer(args.at(0)).peer);
+}
+
+void dmListContact(const vector<string> & args)
+{
+ dmList(getContact(args.at(0)));
+}
+
vector<Command> commands = {
{ "store", store },
{ "stored-generation", storedGeneration },
@@ -576,6 +609,8 @@ vector<Command> commands = {
{ "contact-set-name", contactSetName },
{ "dm-send-peer", dmSendPeer },
{ "dm-send-contact", dmSendContact },
+ { "dm-list-peer", dmListPeer },
+ { "dm-list-contact", dmListContact },
};
}
diff --git a/src/message.cpp b/src/message.cpp
index 06ee8ad..8ae1601 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -6,42 +6,39 @@
using namespace erebos;
using std::nullopt;
using std::scoped_lock;
-using std::unique_lock;
static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
-static vector<DirectMessageThread> threadList;
-static mutex threadLock;
+DEFINE_SHARED_TYPE(DirectMessageThreads,
+ "ee793681-5976-466a-b0f0-4e1907d3fade",
+ &DirectMessageThreads::load,
+ [](const DirectMessageThreads & threads) {
+ return threads.store();
+ })
-DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer)
-{
- for (const auto & t : threadList)
- if (t.p->peer.sameAs(peer))
- return t;
- DirectMessageThread t(new DirectMessageThread::Priv {
- .peer = peer,
- .head = {},
- });
- threadList.push_back(t);
- return t;
+static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates,
+ const Stored<DirectMessageState> & cur,
+ const Identity & peer,
+ vector<Stored<DirectMessageData>> DirectMessageState::* sel)
+{
+ if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty())
+ candidates.push_back(cur);
+ else
+ for (const auto & p : cur->prev)
+ findThreadComponents(candidates, p, peer, sel);
}
-DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head)
+static vector<Stored<DirectMessageState>> findThreadComponents(
+ const vector<Stored<DirectMessageState>> & leaves,
+ const Identity & peer,
+ vector<Stored<DirectMessageData>> DirectMessageState::* sel)
{
- DirectMessageThread nt(new DirectMessageThread::Priv {
- .peer = peer,
- .head = std::move(head),
- });
-
- for (auto & t : threadList)
- if (t.p->peer.sameAs(peer)) {
- t = nt;
- return nt;
- }
-
- threadList.push_back(nt);
- return nt;
+ vector<Stored<DirectMessageState>> candidates;
+ for (const auto & obj : leaves)
+ findThreadComponents(candidates, obj, peer, sel);
+ filterAncestors(candidates);
+ return candidates;
}
@@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const
}
+DirectMessageState DirectMessageState::load(const Ref & ref)
+{
+ if (auto rec = ref->asRecord()) {
+ return DirectMessageState {
+ .prev = rec->items("PREV").as<DirectMessageState>(),
+ .peer = Identity::load(rec->items("peer").asRef()),
+
+ .sent = rec->items("sent").as<DirectMessageData>(),
+ .received = rec->items("received").as<DirectMessageData>(),
+ .seen = rec->items("seen").as<DirectMessageData>(),
+ };
+ }
+
+ return DirectMessageState();
+}
+
+Ref DirectMessageState::store(const Storage & st) const
+{
+ vector<Record::Item> items;
+
+ for (const auto & prev : prev)
+ items.emplace_back("PREV", prev.ref());
+ if (peer)
+ for (const auto & ref : peer->refs())
+ items.emplace_back("peer", ref);
+
+ for (const auto & x : sent)
+ items.emplace_back("sent", x.ref());
+ for (const auto & x : received)
+ items.emplace_back("received", x.ref());
+ for (const auto & x : seen)
+ items.emplace_back("seen", x.ref());
+
+ return st.storeObject(Record(std::move(items)));
+}
+
+
+DirectMessageThreads::DirectMessageThreads() = default;
+
+DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s):
+ DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) })
+{
+}
+
+DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s):
+ state(move(s))
+{
+}
+
+DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs)
+{
+ DirectMessageThreads res;
+ res.state.reserve(refs.size());
+ for (const auto & ref : refs)
+ res.state.push_back(Stored<DirectMessageState>::load(ref));
+ return res;
+}
+
+vector<Ref> DirectMessageThreads::store() const
+{
+ vector<Ref> refs;
+ refs.reserve(state.size());
+ for (const auto & x : state)
+ refs.push_back(x.ref());
+ return refs;
+}
+
+vector<Stored<DirectMessageState>> DirectMessageThreads::data() const
+{
+ return state;
+}
+
+bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const
+{
+ return state == other.state;
+}
+
+bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const
+{
+ return state != other.state;
+}
+
+DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const
+{
+ vector<Stored<DirectMessageData>> head;
+ for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent))
+ for (const auto & m : c->sent)
+ head.push_back(m);
+ for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received))
+ for (const auto & m : c->received)
+ head.push_back(m);
+ filterAncestors(head);
+
+ return new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = move(head),
+ };
+}
+
+vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads)
+{
+ return threads.data();
+}
+
+
DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w)
{
watchers.push_back(w);
@@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc
DirectMessageService::DirectMessageService(Config && c, const Server & s):
config(move(c)),
- server(s)
+ server(s),
+ watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch(
+ std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1)))
{}
DirectMessageService::~DirectMessageService() = default;
@@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx)
return;
auto powner = pid->finalOwner();
- unique_lock lock(threadLock);
-
- vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head);
- head.push_back(Stored<DirectMessageData>::load(ctx.ref()));
- filterAncestors(head);
- auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head));
-
- lock.unlock();
-
- for (const auto & w : config.watchers)
- w(dmt, -1, -1);
+ auto msg = Stored<DirectMessageData>::load(ctx.ref());
+
+ server.localHead().update([&](const Stored<LocalState> & loc) {
+ auto st = loc.ref().storage();
+ auto threads = loc->shared<DirectMessageThreads>();
+
+ vector<Stored<DirectMessageData>> receivedOld;
+ for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received))
+ for (const auto & m : c->received)
+ receivedOld.push_back(m);
+ auto receivedNew = receivedOld;
+ receivedNew.push_back(msg);
+ filterAncestors(receivedNew);
+
+ if (receivedNew != receivedOld) {
+ auto state = st.store(DirectMessageState {
+ .prev = threads.data(),
+ .peer = powner,
+ .sent = {},
+ .received = { msg },
+ .seen = {},
+ });
+
+ auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
+ return res;
+ } else {
+ return loc;
+ }
+ });
}
DirectMessageThread DirectMessageService::thread(const Identity & peer)
{
- scoped_lock lock(threadLock);
- return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner());
+ return server.localState().get().shared<DirectMessageThreads>().thread(peer);
}
DirectMessage DirectMessageService::send(const Identity & to, const string & text)
{
- scoped_lock lock(threadLock);
-
- auto msg = server.localHead().ref().storage().store(DirectMessageData {
- .prev = DirectMessageThread::Priv::getThreadLocked(to).p->head,
- .from = server.identity().finalOwner(),
- .time = ZonedTime::now(),
- .text = text,
+ Stored<DirectMessageData> msg;
+
+ server.localHead().update([&](const Stored<LocalState> & loc) {
+ auto st = loc.ref().storage();
+
+ auto threads = loc->shared<DirectMessageThreads>();
+ msg = st.store(DirectMessageData {
+ .prev = threads.thread(to).p->head,
+ .from = server.identity().finalOwner(),
+ .time = ZonedTime::now(),
+ .text = text,
+ });
+
+ auto state = st.store(DirectMessageState {
+ .prev = threads.data(),
+ .peer = to,
+ .sent = { msg },
+ .received = {},
+ .seen = {},
+ });
+
+ return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
});
- DirectMessageThread::Priv::updateThreadLocked(to, { msg });
-
if (auto peer = server.peer(to))
peer->send(myUUID, msg.ref());
return DirectMessage(new DirectMessage::Priv {
- .data = msg,
+ .data = move(msg),
});
}
@@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text)
return send(id->finalOwner(), text);
throw std::runtime_error("peer without known identity");
}
+
+void DirectMessageService::updateHandler(const DirectMessageThreads & threads)
+{
+ scoped_lock lock(stateMutex);
+
+ auto state = prevState;
+ for (const auto & s : threads.state)
+ state.push_back(s);
+ filterAncestors(state);
+
+ if (state != prevState) {
+ auto queue = state;
+ vector<Identity> peers;
+
+ while (not queue.empty()) {
+ auto cur = move(queue.back());
+ queue.pop_back();
+
+ if (auto peer = cur->peer) {
+ bool found = false;
+ for (const auto & p : peers) {
+ if (p.sameAs(*peer)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (not found)
+ peers.push_back(*peer);
+
+ for (const auto & prev : cur->prev)
+ queue.push_back(prev);
+ }
+ }
+
+ for (const auto & peer : peers) {
+ auto dmt = threads.thread(peer);
+ for (const auto & w : config.watchers)
+ w(dmt, -1, -1);
+ }
+
+ prevState = move(state);
+ }
+}
diff --git a/src/message.h b/src/message.h
index c3c6ba4..4e99cd1 100644
--- a/src/message.h
+++ b/src/message.h
@@ -37,10 +37,6 @@ struct DirectMessageThread::Priv
{
const Identity peer;
const vector<Stored<DirectMessageData>> head;
-
- static DirectMessageThread getThreadLocked(const Identity & peer);
- static DirectMessageThread updateThreadLocked(const Identity & peer,
- vector<Stored<DirectMessageData>> && head);
};
struct DirectMessageThread::Iterator::Priv
@@ -49,4 +45,17 @@ struct DirectMessageThread::Iterator::Priv
vector<Stored<DirectMessageData>> next;
};
+struct DirectMessageState
+{
+ static DirectMessageState load(const Ref &);
+ Ref store(const Storage &) const;
+
+ vector<Stored<DirectMessageState>> prev;
+ optional<Identity> peer;
+
+ vector<Stored<DirectMessageData>> sent;
+ vector<Stored<DirectMessageData>> received;
+ vector<Stored<DirectMessageData>> seen;
+};
+
}
diff --git a/test/message.test b/test/message.test
index 3f108d3..e317116 100644
--- a/test/message.test
+++ b/test/message.test
@@ -14,6 +14,10 @@ test DirectMessage:
/peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1
/peer $peer2_1 id Device1 Owner1/
+ with p1:
+ send "dm-list-peer $peer1_2"
+ expect /dm-list-done/
+
# Send messages to peers
for i in [1..2]:
@@ -21,14 +25,14 @@ test DirectMessage:
expect /dm-received from Owner1 text hello$i/ from p2
for i in [1..2]:
- send "dm-send-peer $peer2_1 hello$i" to p2
- expect /dm-received from Owner2 text hello$i/ from p1
+ send "dm-send-peer $peer2_1 hi$i" to p2
+ expect /dm-received from Owner2 text hi$i/ from p1
for i in [3..4]:
send "dm-send-peer $peer1_2 hello$i" to p1
expect /dm-received from Owner1 text hello$i/ from p2
- send "dm-send-peer $peer2_1 hello$i" to p2
- expect /dm-received from Owner2 text hello$i/ from p1
+ send "dm-send-peer $peer2_1 hi$i" to p2
+ expect /dm-received from Owner2 text hi$i/ from p1
# Create contacts
@@ -56,15 +60,54 @@ test DirectMessage:
# Send messages to contacts
for i in [1..2]:
- send "dm-send-contact $c1_2 hello$i" to p1
- expect /dm-received from Owner1 text hello$i/ from p2
+ send "dm-send-contact $c1_2 hello_c_$i" to p1
+ expect /dm-received from Owner1 text hello_c_$i/ from p2
for i in [1..2]:
- send "dm-send-contact $c2_1 hello$i" to p2
- expect /dm-received from Owner2 text hello$i/ from p1
+ send "dm-send-contact $c2_1 hi_c_$i" to p2
+ expect /dm-received from Owner2 text hi_c_$i/ from p1
for i in [3..4]:
- send "dm-send-contact $c1_2 hello$i" to p1
- expect /dm-received from Owner1 text hello$i/ from p2
- send "dm-send-contact $c2_1 hello$i" to p2
- expect /dm-received from Owner2 text hello$i/ from p1
+ send "dm-send-contact $c1_2 hello_c_$i" to p1
+ expect /dm-received from Owner1 text hello_c_$i/ from p2
+ send "dm-send-contact $c2_1 hi_c_$i" to p2
+ expect /dm-received from Owner2 text hi_c_$i/ from p1
+
+ send "dm-list-contact $c1_2" to p1
+ send "dm-list-contact $c2_1" to p2
+ for p in [p1, p2]:
+ with p:
+ for i in [1..4]:
+ expect /dm-list-item from Owner1 text hello_c_$i/
+ expect /dm-list-item from Owner2 text hi_c_$i/
+ for i in [1..4]:
+ expect /dm-list-item from Owner1 text hello$i/
+ expect /dm-list-item from Owner2 text hi$i/
+ expect /dm-list-(.*)/ capture done
+ guard (done == "done")
+
+ # Reload message history
+
+ with p1:
+ send "stop-server"
+ expect /stop-server-done/
+ send "start-server"
+
+ send "contact-list"
+ expect:
+ /contact-list-item $c1_2 Owner2 Owner2/
+ /contact-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "dm-list-contact $c1_2" to p1
+ send "dm-list-contact $c2_1" to p2
+ for p in [p1, p2]:
+ with p:
+ for i in [1..4]:
+ expect /dm-list-item from Owner1 text hello_c_$i/
+ expect /dm-list-item from Owner2 text hi_c_$i/
+ for i in [1..4]:
+ expect /dm-list-item from Owner1 text hello$i/
+ expect /dm-list-item from Owner2 text hi$i/
+ expect /dm-list-(.*)/ capture done
+ guard (done == "done")