From 9577599e6af4dfb36df81fe5d89801c4ce4a19ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 1 Jul 2023 20:08:48 +0200 Subject: Direct message shared state --- src/main.cpp | 35 +++++++ src/message.cpp | 282 +++++++++++++++++++++++++++++++++++++++++++++----------- src/message.h | 17 +++- 3 files changed, 278 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/main.cpp b/src/main.cpp index f9c5e98..b39130a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -289,6 +289,9 @@ void startServer(const vector &) config.service() .onUpdate([](const DirectMessageThread & thread, ssize_t, ssize_t) { + if (thread.at(0).from()->sameAs(server->identity())) + return; + ostringstream ss; string name = ""; @@ -550,6 +553,36 @@ void dmSendContact(const vector & args) args.at(1)); } +template +static void dmList(const T & peer) +{ + if (auto id = peer.identity()) + for (const auto & msg : h->behavior().get().shared().thread(*id)) { + string name = ""; + if (const auto & from = msg.from()) + if (const auto & opt = from->name()) + name = *opt; + + ostringstream ss; + ss << "dm-list-item" + << " from " << name + << " text " << msg.text() + ; + printLine(ss.str()); + } + printLine("dm-list-done"); +} + +void dmListPeer(const vector & args) +{ + dmList(getPeer(args.at(0)).peer); +} + +void dmListContact(const vector & args) +{ + dmList(getContact(args.at(0))); +} + vector commands = { { "store", store }, { "stored-generation", storedGeneration }, @@ -576,6 +609,8 @@ vector commands = { { "contact-set-name", contactSetName }, { "dm-send-peer", dmSendPeer }, { "dm-send-contact", dmSendContact }, + { "dm-list-peer", dmListPeer }, + { "dm-list-contact", dmListContact }, }; } diff --git a/src/message.cpp b/src/message.cpp index 06ee8ad..8ae1601 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -6,42 +6,39 @@ using namespace erebos; using std::nullopt; using std::scoped_lock; -using std::unique_lock; static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); -static vector threadList; -static mutex threadLock; +DEFINE_SHARED_TYPE(DirectMessageThreads, + "ee793681-5976-466a-b0f0-4e1907d3fade", + &DirectMessageThreads::load, + [](const DirectMessageThreads & threads) { + return threads.store(); + }) -DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) -{ - for (const auto & t : threadList) - if (t.p->peer.sameAs(peer)) - return t; - DirectMessageThread t(new DirectMessageThread::Priv { - .peer = peer, - .head = {}, - }); - threadList.push_back(t); - return t; +static void findThreadComponents(vector> & candidates, + const Stored & cur, + const Identity & peer, + vector> DirectMessageState::* sel) +{ + if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty()) + candidates.push_back(cur); + else + for (const auto & p : cur->prev) + findThreadComponents(candidates, p, peer, sel); } -DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector> && head) +static vector> findThreadComponents( + const vector> & leaves, + const Identity & peer, + vector> DirectMessageState::* sel) { - DirectMessageThread nt(new DirectMessageThread::Priv { - .peer = peer, - .head = std::move(head), - }); - - for (auto & t : threadList) - if (t.p->peer.sameAs(peer)) { - t = nt; - return nt; - } - - threadList.push_back(nt); - return nt; + vector> candidates; + for (const auto & obj : leaves) + findThreadComponents(candidates, obj, peer, sel); + filterAncestors(candidates); + return candidates; } @@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const } +DirectMessageState DirectMessageState::load(const Ref & ref) +{ + if (auto rec = ref->asRecord()) { + return DirectMessageState { + .prev = rec->items("PREV").as(), + .peer = Identity::load(rec->items("peer").asRef()), + + .sent = rec->items("sent").as(), + .received = rec->items("received").as(), + .seen = rec->items("seen").as(), + }; + } + + return DirectMessageState(); +} + +Ref DirectMessageState::store(const Storage & st) const +{ + vector items; + + for (const auto & prev : prev) + items.emplace_back("PREV", prev.ref()); + if (peer) + for (const auto & ref : peer->refs()) + items.emplace_back("peer", ref); + + for (const auto & x : sent) + items.emplace_back("sent", x.ref()); + for (const auto & x : received) + items.emplace_back("received", x.ref()); + for (const auto & x : seen) + items.emplace_back("seen", x.ref()); + + return st.storeObject(Record(std::move(items))); +} + + +DirectMessageThreads::DirectMessageThreads() = default; + +DirectMessageThreads::DirectMessageThreads(Stored s): + DirectMessageThreads(vector> { move(s) }) +{ +} + +DirectMessageThreads::DirectMessageThreads(vector> s): + state(move(s)) +{ +} + +DirectMessageThreads DirectMessageThreads::load(const vector & refs) +{ + DirectMessageThreads res; + res.state.reserve(refs.size()); + for (const auto & ref : refs) + res.state.push_back(Stored::load(ref)); + return res; +} + +vector DirectMessageThreads::store() const +{ + vector refs; + refs.reserve(state.size()); + for (const auto & x : state) + refs.push_back(x.ref()); + return refs; +} + +vector> DirectMessageThreads::data() const +{ + return state; +} + +bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const +{ + return state == other.state; +} + +bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const +{ + return state != other.state; +} + +DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const +{ + vector> head; + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent)) + for (const auto & m : c->sent) + head.push_back(m); + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received)) + for (const auto & m : c->received) + head.push_back(m); + filterAncestors(head); + + return new DirectMessageThread::Priv { + .peer = peer, + .head = move(head), + }; +} + +vector> Mergeable::components(const DirectMessageThreads & threads) +{ + return threads.data(); +} + + DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w) { watchers.push_back(w); @@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc DirectMessageService::DirectMessageService(Config && c, const Server & s): config(move(c)), - server(s) + server(s), + watched(server.localState().lens().lens().watch( + std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1))) {} DirectMessageService::~DirectMessageService() = default; @@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx) return; auto powner = pid->finalOwner(); - unique_lock lock(threadLock); - - vector> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head); - head.push_back(Stored::load(ctx.ref())); - filterAncestors(head); - auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head)); - - lock.unlock(); - - for (const auto & w : config.watchers) - w(dmt, -1, -1); + auto msg = Stored::load(ctx.ref()); + + server.localHead().update([&](const Stored & loc) { + auto st = loc.ref().storage(); + auto threads = loc->shared(); + + vector> receivedOld; + for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received)) + for (const auto & m : c->received) + receivedOld.push_back(m); + auto receivedNew = receivedOld; + receivedNew.push_back(msg); + filterAncestors(receivedNew); + + if (receivedNew != receivedOld) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = powner, + .sent = {}, + .received = { msg }, + .seen = {}, + }); + + auto res = st.store(loc->shared(DirectMessageThreads(state))); + return res; + } else { + return loc; + } + }); } DirectMessageThread DirectMessageService::thread(const Identity & peer) { - scoped_lock lock(threadLock); - return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner()); + return server.localState().get().shared().thread(peer); } DirectMessage DirectMessageService::send(const Identity & to, const string & text) { - scoped_lock lock(threadLock); - - auto msg = server.localHead().ref().storage().store(DirectMessageData { - .prev = DirectMessageThread::Priv::getThreadLocked(to).p->head, - .from = server.identity().finalOwner(), - .time = ZonedTime::now(), - .text = text, + Stored msg; + + server.localHead().update([&](const Stored & loc) { + auto st = loc.ref().storage(); + + auto threads = loc->shared(); + msg = st.store(DirectMessageData { + .prev = threads.thread(to).p->head, + .from = server.identity().finalOwner(), + .time = ZonedTime::now(), + .text = text, + }); + + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = to, + .sent = { msg }, + .received = {}, + .seen = {}, + }); + + return st.store(loc->shared(DirectMessageThreads(state))); }); - DirectMessageThread::Priv::updateThreadLocked(to, { msg }); - if (auto peer = server.peer(to)) peer->send(myUUID, msg.ref()); return DirectMessage(new DirectMessage::Priv { - .data = msg, + .data = move(msg), }); } @@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text) return send(id->finalOwner(), text); throw std::runtime_error("peer without known identity"); } + +void DirectMessageService::updateHandler(const DirectMessageThreads & threads) +{ + scoped_lock lock(stateMutex); + + auto state = prevState; + for (const auto & s : threads.state) + state.push_back(s); + filterAncestors(state); + + if (state != prevState) { + auto queue = state; + vector peers; + + while (not queue.empty()) { + auto cur = move(queue.back()); + queue.pop_back(); + + if (auto peer = cur->peer) { + bool found = false; + for (const auto & p : peers) { + if (p.sameAs(*peer)) { + found = true; + break; + } + } + + if (not found) + peers.push_back(*peer); + + for (const auto & prev : cur->prev) + queue.push_back(prev); + } + } + + for (const auto & peer : peers) { + auto dmt = threads.thread(peer); + for (const auto & w : config.watchers) + w(dmt, -1, -1); + } + + prevState = move(state); + } +} diff --git a/src/message.h b/src/message.h index c3c6ba4..4e99cd1 100644 --- a/src/message.h +++ b/src/message.h @@ -37,10 +37,6 @@ struct DirectMessageThread::Priv { const Identity peer; const vector> head; - - static DirectMessageThread getThreadLocked(const Identity & peer); - static DirectMessageThread updateThreadLocked(const Identity & peer, - vector> && head); }; struct DirectMessageThread::Iterator::Priv @@ -49,4 +45,17 @@ struct DirectMessageThread::Iterator::Priv vector> next; }; +struct DirectMessageState +{ + static DirectMessageState load(const Ref &); + Ref store(const Storage &) const; + + vector> prev; + optional peer; + + vector> sent; + vector> received; + vector> seen; +}; + } -- cgit v1.2.3