diff options
-rw-r--r-- | include/erebos/message.h | 43 | ||||
-rw-r--r-- | include/erebos/storage.h | 1 | ||||
-rw-r--r-- | src/main.cpp | 35 | ||||
-rw-r--r-- | src/message.cpp | 282 | ||||
-rw-r--r-- | src/message.h | 17 | ||||
-rw-r--r-- | test/message.test | 67 |
6 files changed, 377 insertions, 68 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h index 011007a..194044c 100644 --- a/include/erebos/message.h +++ b/include/erebos/message.h @@ -1,19 +1,23 @@ #pragma once +#include <erebos/merge.h> #include <erebos/service.h> #include <chrono> #include <functional> #include <memory> +#include <mutex> #include <optional> #include <string> namespace erebos { +using std::mutex; using std::unique_ptr; class Contact; class Identity; +struct DirectMessageState; class DirectMessage { @@ -67,11 +71,43 @@ public: private: friend class DirectMessageService; + friend class DirectMessageThreads; struct Priv; DirectMessageThread(Priv *); std::shared_ptr<Priv> p; }; +class DirectMessageThreads +{ +public: + DirectMessageThreads(); + DirectMessageThreads(Stored<DirectMessageState>); + DirectMessageThreads(vector<Stored<DirectMessageState>>); + + static DirectMessageThreads load(const vector<Ref> & refs); + vector<Ref> store() const; + vector<Stored<DirectMessageState>> data() const; + + bool operator==(const DirectMessageThreads &) const; + bool operator!=(const DirectMessageThreads &) const; + + DirectMessageThread thread(const Identity &) const; + +private: + vector<Stored<DirectMessageState>> state; + + friend class DirectMessageService; +}; + +DECLARE_SHARED_TYPE(DirectMessageThreads) + +template<> struct Mergeable<DirectMessageThreads> +{ + using Component = DirectMessageState; + static vector<Stored<DirectMessageState>> components(const DirectMessageThreads &); + static Contact merge(vector<Stored<DirectMessageState>>); +}; + class DirectMessageService : public Service { public: @@ -100,8 +136,15 @@ public: DirectMessage send(const Peer &, const std::string &); private: + void updateHandler(const DirectMessageThreads &); + const Config config; const Server & server; + + vector<Stored<DirectMessageState>> prevState; + mutex stateMutex; + + Watched<DirectMessageThreads> watched; }; } diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 32049db..8a701e9 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -474,6 +474,7 @@ class Stored friend class Storage; friend class Head<T>; public: + Stored() = default; Stored(const Stored &) = default; Stored(Stored &&) = default; Stored & operator=(const Stored &) = default; diff --git a/src/main.cpp b/src/main.cpp index f9c5e98..b39130a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -289,6 +289,9 @@ void startServer(const vector<string> &) config.service<DirectMessageService>() .onUpdate([](const DirectMessageThread & thread, ssize_t, ssize_t) { + if (thread.at(0).from()->sameAs(server->identity())) + return; + ostringstream ss; string name = "<unnamed>"; @@ -550,6 +553,36 @@ void dmSendContact(const vector<string> & args) args.at(1)); } +template<class T> +static void dmList(const T & peer) +{ + if (auto id = peer.identity()) + for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) { + string name = "<unnamed>"; + if (const auto & from = msg.from()) + if (const auto & opt = from->name()) + name = *opt; + + ostringstream ss; + ss << "dm-list-item" + << " from " << name + << " text " << msg.text() + ; + printLine(ss.str()); + } + printLine("dm-list-done"); +} + +void dmListPeer(const vector<string> & args) +{ + dmList(getPeer(args.at(0)).peer); +} + +void dmListContact(const vector<string> & args) +{ + dmList(getContact(args.at(0))); +} + vector<Command> commands = { { "store", store }, { "stored-generation", storedGeneration }, @@ -576,6 +609,8 @@ vector<Command> commands = { { "contact-set-name", contactSetName }, { "dm-send-peer", dmSendPeer }, { "dm-send-contact", dmSendContact }, + { "dm-list-peer", dmListPeer }, + { "dm-list-contact", dmListContact }, }; } diff --git a/src/message.cpp b/src/message.cpp index 06ee8ad..8ae1601 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -6,42 +6,39 @@ using namespace erebos; using std::nullopt; using std::scoped_lock; -using std::unique_lock; static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); -static vector<DirectMessageThread> threadList; -static mutex threadLock; +DEFINE_SHARED_TYPE(DirectMessageThreads, + "ee793681-5976-466a-b0f0-4e1907d3fade", + &DirectMessageThreads::load, + [](const DirectMessageThreads & threads) { + return threads.store(); + }) -DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) -{ - for (const auto & t : threadList) - if (t.p->peer.sameAs(peer)) - return t; - DirectMessageThread t(new DirectMessageThread::Priv { - .peer = peer, - .head = {}, - }); - threadList.push_back(t); - return t; +static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates, + const Stored<DirectMessageState> & cur, + const Identity & peer, + vector<Stored<DirectMessageData>> DirectMessageState::* sel) +{ + if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty()) + candidates.push_back(cur); + else + for (const auto & p : cur->prev) + findThreadComponents(candidates, p, peer, sel); } -DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head) +static vector<Stored<DirectMessageState>> findThreadComponents( + const vector<Stored<DirectMessageState>> & leaves, + const Identity & peer, + vector<Stored<DirectMessageData>> DirectMessageState::* sel) { - DirectMessageThread nt(new DirectMessageThread::Priv { - .peer = peer, - .head = std::move(head), - }); - - for (auto & t : threadList) - if (t.p->peer.sameAs(peer)) { - t = nt; - return nt; - } - - threadList.push_back(nt); - return nt; + vector<Stored<DirectMessageState>> candidates; + for (const auto & obj : leaves) + findThreadComponents(candidates, obj, peer, sel); + filterAncestors(candidates); + return candidates; } @@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const } +DirectMessageState DirectMessageState::load(const Ref & ref) +{ + if (auto rec = ref->asRecord()) { + return DirectMessageState { + .prev = rec->items("PREV").as<DirectMessageState>(), + .peer = Identity::load(rec->items("peer").asRef()), + + .sent = rec->items("sent").as<DirectMessageData>(), + .received = rec->items("received").as<DirectMessageData>(), + .seen = rec->items("seen").as<DirectMessageData>(), + }; + } + + return DirectMessageState(); +} + +Ref DirectMessageState::store(const Storage & st) const +{ + vector<Record::Item> items; + + for (const auto & prev : prev) + items.emplace_back("PREV", prev.ref()); + if (peer) + for (const auto & ref : peer->refs()) + items.emplace_back("peer", ref); + + for (const auto & x : sent) + items.emplace_back("sent", x.ref()); + for (const auto & x : received) + items.emplace_back("received", x.ref()); + for (const auto & x : seen) + items.emplace_back("seen", x.ref()); + + return st.storeObject(Record(std::move(items))); +} + + +DirectMessageThreads::DirectMessageThreads() = default; + +DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s): + DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) }) +{ +} + +DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s): + state(move(s)) +{ +} + +DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs) +{ + DirectMessageThreads res; + res.state.reserve(refs.size()); + for (const auto & ref : refs) + res.state.push_back(Stored<DirectMessageState>::load(ref)); + return res; +} + +vector<Ref> DirectMessageThreads::store() const +{ + vector<Ref> refs; + refs.reserve(state.size()); + for (const auto & x : state) + refs.push_back(x.ref()); + return refs; +} + +vector<Stored<DirectMessageState>> DirectMessageThreads::data() const +{ + return state; +} + +bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const +{ + return state == other.state; +} + +bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const +{ + return state != other.state; +} + +DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const +{ + vector<Stored<DirectMessageData>> head; + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent)) + for (const auto & m : c->sent) + head.push_back(m); + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received)) + for (const auto & m : c->received) + head.push_back(m); + filterAncestors(head); + + return new DirectMessageThread::Priv { + .peer = peer, + .head = move(head), + }; +} + +vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads) +{ + return threads.data(); +} + + DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w) { watchers.push_back(w); @@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc DirectMessageService::DirectMessageService(Config && c, const Server & s): config(move(c)), - server(s) + server(s), + watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch( + std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1))) {} DirectMessageService::~DirectMessageService() = default; @@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx) return; auto powner = pid->finalOwner(); - unique_lock lock(threadLock); - - vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head); - head.push_back(Stored<DirectMessageData>::load(ctx.ref())); - filterAncestors(head); - auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head)); - - lock.unlock(); - - for (const auto & w : config.watchers) - w(dmt, -1, -1); + auto msg = Stored<DirectMessageData>::load(ctx.ref()); + + server.localHead().update([&](const Stored<LocalState> & loc) { + auto st = loc.ref().storage(); + auto threads = loc->shared<DirectMessageThreads>(); + + vector<Stored<DirectMessageData>> receivedOld; + for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received)) + for (const auto & m : c->received) + receivedOld.push_back(m); + auto receivedNew = receivedOld; + receivedNew.push_back(msg); + filterAncestors(receivedNew); + + if (receivedNew != receivedOld) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = powner, + .sent = {}, + .received = { msg }, + .seen = {}, + }); + + auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); + return res; + } else { + return loc; + } + }); } DirectMessageThread DirectMessageService::thread(const Identity & peer) { - scoped_lock lock(threadLock); - return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner()); + return server.localState().get().shared<DirectMessageThreads>().thread(peer); } DirectMessage DirectMessageService::send(const Identity & to, const string & text) { - scoped_lock lock(threadLock); - - auto msg = server.localHead().ref().storage().store(DirectMessageData { - .prev = DirectMessageThread::Priv::getThreadLocked(to).p->head, - .from = server.identity().finalOwner(), - .time = ZonedTime::now(), - .text = text, + Stored<DirectMessageData> msg; + + server.localHead().update([&](const Stored<LocalState> & loc) { + auto st = loc.ref().storage(); + + auto threads = loc->shared<DirectMessageThreads>(); + msg = st.store(DirectMessageData { + .prev = threads.thread(to).p->head, + .from = server.identity().finalOwner(), + .time = ZonedTime::now(), + .text = text, + }); + + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = to, + .sent = { msg }, + .received = {}, + .seen = {}, + }); + + return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); }); - DirectMessageThread::Priv::updateThreadLocked(to, { msg }); - if (auto peer = server.peer(to)) peer->send(myUUID, msg.ref()); return DirectMessage(new DirectMessage::Priv { - .data = msg, + .data = move(msg), }); } @@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text) return send(id->finalOwner(), text); throw std::runtime_error("peer without known identity"); } + +void DirectMessageService::updateHandler(const DirectMessageThreads & threads) +{ + scoped_lock lock(stateMutex); + + auto state = prevState; + for (const auto & s : threads.state) + state.push_back(s); + filterAncestors(state); + + if (state != prevState) { + auto queue = state; + vector<Identity> peers; + + while (not queue.empty()) { + auto cur = move(queue.back()); + queue.pop_back(); + + if (auto peer = cur->peer) { + bool found = false; + for (const auto & p : peers) { + if (p.sameAs(*peer)) { + found = true; + break; + } + } + + if (not found) + peers.push_back(*peer); + + for (const auto & prev : cur->prev) + queue.push_back(prev); + } + } + + for (const auto & peer : peers) { + auto dmt = threads.thread(peer); + for (const auto & w : config.watchers) + w(dmt, -1, -1); + } + + prevState = move(state); + } +} diff --git a/src/message.h b/src/message.h index c3c6ba4..4e99cd1 100644 --- a/src/message.h +++ b/src/message.h @@ -37,10 +37,6 @@ struct DirectMessageThread::Priv { const Identity peer; const vector<Stored<DirectMessageData>> head; - - static DirectMessageThread getThreadLocked(const Identity & peer); - static DirectMessageThread updateThreadLocked(const Identity & peer, - vector<Stored<DirectMessageData>> && head); }; struct DirectMessageThread::Iterator::Priv @@ -49,4 +45,17 @@ struct DirectMessageThread::Iterator::Priv vector<Stored<DirectMessageData>> next; }; +struct DirectMessageState +{ + static DirectMessageState load(const Ref &); + Ref store(const Storage &) const; + + vector<Stored<DirectMessageState>> prev; + optional<Identity> peer; + + vector<Stored<DirectMessageData>> sent; + vector<Stored<DirectMessageData>> received; + vector<Stored<DirectMessageData>> seen; +}; + } diff --git a/test/message.test b/test/message.test index 3f108d3..e317116 100644 --- a/test/message.test +++ b/test/message.test @@ -14,6 +14,10 @@ test DirectMessage: /peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1 /peer $peer2_1 id Device1 Owner1/ + with p1: + send "dm-list-peer $peer1_2" + expect /dm-list-done/ + # Send messages to peers for i in [1..2]: @@ -21,14 +25,14 @@ test DirectMessage: expect /dm-received from Owner1 text hello$i/ from p2 for i in [1..2]: - send "dm-send-peer $peer2_1 hello$i" to p2 - expect /dm-received from Owner2 text hello$i/ from p1 + send "dm-send-peer $peer2_1 hi$i" to p2 + expect /dm-received from Owner2 text hi$i/ from p1 for i in [3..4]: send "dm-send-peer $peer1_2 hello$i" to p1 expect /dm-received from Owner1 text hello$i/ from p2 - send "dm-send-peer $peer2_1 hello$i" to p2 - expect /dm-received from Owner2 text hello$i/ from p1 + send "dm-send-peer $peer2_1 hi$i" to p2 + expect /dm-received from Owner2 text hi$i/ from p1 # Create contacts @@ -56,15 +60,54 @@ test DirectMessage: # Send messages to contacts for i in [1..2]: - send "dm-send-contact $c1_2 hello$i" to p1 - expect /dm-received from Owner1 text hello$i/ from p2 + send "dm-send-contact $c1_2 hello_c_$i" to p1 + expect /dm-received from Owner1 text hello_c_$i/ from p2 for i in [1..2]: - send "dm-send-contact $c2_1 hello$i" to p2 - expect /dm-received from Owner2 text hello$i/ from p1 + send "dm-send-contact $c2_1 hi_c_$i" to p2 + expect /dm-received from Owner2 text hi_c_$i/ from p1 for i in [3..4]: - send "dm-send-contact $c1_2 hello$i" to p1 - expect /dm-received from Owner1 text hello$i/ from p2 - send "dm-send-contact $c2_1 hello$i" to p2 - expect /dm-received from Owner2 text hello$i/ from p1 + send "dm-send-contact $c1_2 hello_c_$i" to p1 + expect /dm-received from Owner1 text hello_c_$i/ from p2 + send "dm-send-contact $c2_1 hi_c_$i" to p2 + expect /dm-received from Owner2 text hi_c_$i/ from p1 + + send "dm-list-contact $c1_2" to p1 + send "dm-list-contact $c2_1" to p2 + for p in [p1, p2]: + with p: + for i in [1..4]: + expect /dm-list-item from Owner1 text hello_c_$i/ + expect /dm-list-item from Owner2 text hi_c_$i/ + for i in [1..4]: + expect /dm-list-item from Owner1 text hello$i/ + expect /dm-list-item from Owner2 text hi$i/ + expect /dm-list-(.*)/ capture done + guard (done == "done") + + # Reload message history + + with p1: + send "stop-server" + expect /stop-server-done/ + send "start-server" + + send "contact-list" + expect: + /contact-list-item $c1_2 Owner2 Owner2/ + /contact-list-(.*)/ capture done + guard (done == "done") + + send "dm-list-contact $c1_2" to p1 + send "dm-list-contact $c2_1" to p2 + for p in [p1, p2]: + with p: + for i in [1..4]: + expect /dm-list-item from Owner1 text hello_c_$i/ + expect /dm-list-item from Owner2 text hi_c_$i/ + for i in [1..4]: + expect /dm-list-item from Owner1 text hello$i/ + expect /dm-list-item from Owner2 text hi$i/ + expect /dm-list-(.*)/ capture done + guard (done == "done") |