summaryrefslogtreecommitdiff
path: root/src/message.cpp
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-07-01 20:08:48 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-07-01 22:07:27 +0200
commit9577599e6af4dfb36df81fe5d89801c4ce4a19ab (patch)
tree1d383f6f3466c3c2198b4de2a2f53b6ad500fe39 /src/message.cpp
parentafab7dc5673fbc5fd600182612626676ae36d1c0 (diff)
Direct message shared state
Diffstat (limited to 'src/message.cpp')
-rw-r--r--src/message.cpp282
1 files changed, 230 insertions, 52 deletions
diff --git a/src/message.cpp b/src/message.cpp
index 06ee8ad..8ae1601 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -6,42 +6,39 @@
using namespace erebos;
using std::nullopt;
using std::scoped_lock;
-using std::unique_lock;
static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
-static vector<DirectMessageThread> threadList;
-static mutex threadLock;
+DEFINE_SHARED_TYPE(DirectMessageThreads,
+ "ee793681-5976-466a-b0f0-4e1907d3fade",
+ &DirectMessageThreads::load,
+ [](const DirectMessageThreads & threads) {
+ return threads.store();
+ })
-DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer)
-{
- for (const auto & t : threadList)
- if (t.p->peer.sameAs(peer))
- return t;
- DirectMessageThread t(new DirectMessageThread::Priv {
- .peer = peer,
- .head = {},
- });
- threadList.push_back(t);
- return t;
+static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates,
+ const Stored<DirectMessageState> & cur,
+ const Identity & peer,
+ vector<Stored<DirectMessageData>> DirectMessageState::* sel)
+{
+ if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty())
+ candidates.push_back(cur);
+ else
+ for (const auto & p : cur->prev)
+ findThreadComponents(candidates, p, peer, sel);
}
-DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head)
+static vector<Stored<DirectMessageState>> findThreadComponents(
+ const vector<Stored<DirectMessageState>> & leaves,
+ const Identity & peer,
+ vector<Stored<DirectMessageData>> DirectMessageState::* sel)
{
- DirectMessageThread nt(new DirectMessageThread::Priv {
- .peer = peer,
- .head = std::move(head),
- });
-
- for (auto & t : threadList)
- if (t.p->peer.sameAs(peer)) {
- t = nt;
- return nt;
- }
-
- threadList.push_back(nt);
- return nt;
+ vector<Stored<DirectMessageState>> candidates;
+ for (const auto & obj : leaves)
+ findThreadComponents(candidates, obj, peer, sel);
+ filterAncestors(candidates);
+ return candidates;
}
@@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const
}
+DirectMessageState DirectMessageState::load(const Ref & ref)
+{
+ if (auto rec = ref->asRecord()) {
+ return DirectMessageState {
+ .prev = rec->items("PREV").as<DirectMessageState>(),
+ .peer = Identity::load(rec->items("peer").asRef()),
+
+ .sent = rec->items("sent").as<DirectMessageData>(),
+ .received = rec->items("received").as<DirectMessageData>(),
+ .seen = rec->items("seen").as<DirectMessageData>(),
+ };
+ }
+
+ return DirectMessageState();
+}
+
+Ref DirectMessageState::store(const Storage & st) const
+{
+ vector<Record::Item> items;
+
+ for (const auto & prev : prev)
+ items.emplace_back("PREV", prev.ref());
+ if (peer)
+ for (const auto & ref : peer->refs())
+ items.emplace_back("peer", ref);
+
+ for (const auto & x : sent)
+ items.emplace_back("sent", x.ref());
+ for (const auto & x : received)
+ items.emplace_back("received", x.ref());
+ for (const auto & x : seen)
+ items.emplace_back("seen", x.ref());
+
+ return st.storeObject(Record(std::move(items)));
+}
+
+
+DirectMessageThreads::DirectMessageThreads() = default;
+
+DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s):
+ DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) })
+{
+}
+
+DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s):
+ state(move(s))
+{
+}
+
+DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs)
+{
+ DirectMessageThreads res;
+ res.state.reserve(refs.size());
+ for (const auto & ref : refs)
+ res.state.push_back(Stored<DirectMessageState>::load(ref));
+ return res;
+}
+
+vector<Ref> DirectMessageThreads::store() const
+{
+ vector<Ref> refs;
+ refs.reserve(state.size());
+ for (const auto & x : state)
+ refs.push_back(x.ref());
+ return refs;
+}
+
+vector<Stored<DirectMessageState>> DirectMessageThreads::data() const
+{
+ return state;
+}
+
+bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const
+{
+ return state == other.state;
+}
+
+bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const
+{
+ return state != other.state;
+}
+
+DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const
+{
+ vector<Stored<DirectMessageData>> head;
+ for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent))
+ for (const auto & m : c->sent)
+ head.push_back(m);
+ for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received))
+ for (const auto & m : c->received)
+ head.push_back(m);
+ filterAncestors(head);
+
+ return new DirectMessageThread::Priv {
+ .peer = peer,
+ .head = move(head),
+ };
+}
+
+vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads)
+{
+ return threads.data();
+}
+
+
DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w)
{
watchers.push_back(w);
@@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc
DirectMessageService::DirectMessageService(Config && c, const Server & s):
config(move(c)),
- server(s)
+ server(s),
+ watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch(
+ std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1)))
{}
DirectMessageService::~DirectMessageService() = default;
@@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx)
return;
auto powner = pid->finalOwner();
- unique_lock lock(threadLock);
-
- vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head);
- head.push_back(Stored<DirectMessageData>::load(ctx.ref()));
- filterAncestors(head);
- auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head));
-
- lock.unlock();
-
- for (const auto & w : config.watchers)
- w(dmt, -1, -1);
+ auto msg = Stored<DirectMessageData>::load(ctx.ref());
+
+ server.localHead().update([&](const Stored<LocalState> & loc) {
+ auto st = loc.ref().storage();
+ auto threads = loc->shared<DirectMessageThreads>();
+
+ vector<Stored<DirectMessageData>> receivedOld;
+ for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received))
+ for (const auto & m : c->received)
+ receivedOld.push_back(m);
+ auto receivedNew = receivedOld;
+ receivedNew.push_back(msg);
+ filterAncestors(receivedNew);
+
+ if (receivedNew != receivedOld) {
+ auto state = st.store(DirectMessageState {
+ .prev = threads.data(),
+ .peer = powner,
+ .sent = {},
+ .received = { msg },
+ .seen = {},
+ });
+
+ auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
+ return res;
+ } else {
+ return loc;
+ }
+ });
}
DirectMessageThread DirectMessageService::thread(const Identity & peer)
{
- scoped_lock lock(threadLock);
- return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner());
+ return server.localState().get().shared<DirectMessageThreads>().thread(peer);
}
DirectMessage DirectMessageService::send(const Identity & to, const string & text)
{
- scoped_lock lock(threadLock);
-
- auto msg = server.localHead().ref().storage().store(DirectMessageData {
- .prev = DirectMessageThread::Priv::getThreadLocked(to).p->head,
- .from = server.identity().finalOwner(),
- .time = ZonedTime::now(),
- .text = text,
+ Stored<DirectMessageData> msg;
+
+ server.localHead().update([&](const Stored<LocalState> & loc) {
+ auto st = loc.ref().storage();
+
+ auto threads = loc->shared<DirectMessageThreads>();
+ msg = st.store(DirectMessageData {
+ .prev = threads.thread(to).p->head,
+ .from = server.identity().finalOwner(),
+ .time = ZonedTime::now(),
+ .text = text,
+ });
+
+ auto state = st.store(DirectMessageState {
+ .prev = threads.data(),
+ .peer = to,
+ .sent = { msg },
+ .received = {},
+ .seen = {},
+ });
+
+ return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
});
- DirectMessageThread::Priv::updateThreadLocked(to, { msg });
-
if (auto peer = server.peer(to))
peer->send(myUUID, msg.ref());
return DirectMessage(new DirectMessage::Priv {
- .data = msg,
+ .data = move(msg),
});
}
@@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text)
return send(id->finalOwner(), text);
throw std::runtime_error("peer without known identity");
}
+
+void DirectMessageService::updateHandler(const DirectMessageThreads & threads)
+{
+ scoped_lock lock(stateMutex);
+
+ auto state = prevState;
+ for (const auto & s : threads.state)
+ state.push_back(s);
+ filterAncestors(state);
+
+ if (state != prevState) {
+ auto queue = state;
+ vector<Identity> peers;
+
+ while (not queue.empty()) {
+ auto cur = move(queue.back());
+ queue.pop_back();
+
+ if (auto peer = cur->peer) {
+ bool found = false;
+ for (const auto & p : peers) {
+ if (p.sameAs(*peer)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (not found)
+ peers.push_back(*peer);
+
+ for (const auto & prev : cur->prev)
+ queue.push_back(prev);
+ }
+ }
+
+ for (const auto & peer : peers) {
+ auto dmt = threads.thread(peer);
+ for (const auto & w : config.watchers)
+ w(dmt, -1, -1);
+ }
+
+ prevState = move(state);
+ }
+}