diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2023-07-01 20:08:48 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-07-01 22:07:27 +0200 | 
| commit | 9577599e6af4dfb36df81fe5d89801c4ce4a19ab (patch) | |
| tree | 1d383f6f3466c3c2198b4de2a2f53b6ad500fe39 | |
| parent | afab7dc5673fbc5fd600182612626676ae36d1c0 (diff) | |
Direct message shared state
| -rw-r--r-- | include/erebos/message.h | 43 | ||||
| -rw-r--r-- | include/erebos/storage.h | 1 | ||||
| -rw-r--r-- | src/main.cpp | 35 | ||||
| -rw-r--r-- | src/message.cpp | 282 | ||||
| -rw-r--r-- | src/message.h | 17 | ||||
| -rw-r--r-- | test/message.test | 67 | 
6 files changed, 377 insertions, 68 deletions
| diff --git a/include/erebos/message.h b/include/erebos/message.h index 011007a..194044c 100644 --- a/include/erebos/message.h +++ b/include/erebos/message.h @@ -1,19 +1,23 @@  #pragma once +#include <erebos/merge.h>  #include <erebos/service.h>  #include <chrono>  #include <functional>  #include <memory> +#include <mutex>  #include <optional>  #include <string>  namespace erebos { +using std::mutex;  using std::unique_ptr;  class Contact;  class Identity; +struct DirectMessageState;  class DirectMessage  { @@ -67,11 +71,43 @@ public:  private:  	friend class DirectMessageService; +	friend class DirectMessageThreads;  	struct Priv;  	DirectMessageThread(Priv *);  	std::shared_ptr<Priv> p;  }; +class DirectMessageThreads +{ +public: +	DirectMessageThreads(); +	DirectMessageThreads(Stored<DirectMessageState>); +	DirectMessageThreads(vector<Stored<DirectMessageState>>); + +	static DirectMessageThreads load(const vector<Ref> & refs); +	vector<Ref> store() const; +	vector<Stored<DirectMessageState>> data() const; + +	bool operator==(const DirectMessageThreads &) const; +	bool operator!=(const DirectMessageThreads &) const; + +	DirectMessageThread thread(const Identity &) const; + +private: +	vector<Stored<DirectMessageState>> state; + +	friend class DirectMessageService; +}; + +DECLARE_SHARED_TYPE(DirectMessageThreads) + +template<> struct Mergeable<DirectMessageThreads> +{ +	using Component = DirectMessageState; +	static vector<Stored<DirectMessageState>> components(const DirectMessageThreads &); +	static Contact merge(vector<Stored<DirectMessageState>>); +}; +  class DirectMessageService : public Service  {  public: @@ -100,8 +136,15 @@ public:  	DirectMessage send(const Peer &, const std::string &);  private: +	void updateHandler(const DirectMessageThreads &); +  	const Config config;  	const Server & server; + +	vector<Stored<DirectMessageState>> prevState; +	mutex stateMutex; + +	Watched<DirectMessageThreads> watched;  };  } diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 32049db..8a701e9 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -474,6 +474,7 @@ class Stored  	friend class Storage;  	friend class Head<T>;  public: +	Stored() = default;  	Stored(const Stored &) = default;  	Stored(Stored &&) = default;  	Stored & operator=(const Stored &) = default; diff --git a/src/main.cpp b/src/main.cpp index f9c5e98..b39130a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -289,6 +289,9 @@ void startServer(const vector<string> &)  	config.service<DirectMessageService>()  		.onUpdate([](const DirectMessageThread & thread, ssize_t, ssize_t) { +			if (thread.at(0).from()->sameAs(server->identity())) +				return; +  			ostringstream ss;  			string name = "<unnamed>"; @@ -550,6 +553,36 @@ void dmSendContact(const vector<string> & args)  			args.at(1));  } +template<class T> +static void dmList(const T & peer) +{ +	if (auto id = peer.identity()) +		for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) { +			string name = "<unnamed>"; +			if (const auto & from = msg.from()) +				if (const auto & opt = from->name()) +					name = *opt; + +			ostringstream ss; +			ss << "dm-list-item" +				<< " from " << name +				<< " text " << msg.text() +				; +			printLine(ss.str()); +		} +	printLine("dm-list-done"); +} + +void dmListPeer(const vector<string> & args) +{ +	dmList(getPeer(args.at(0)).peer); +} + +void dmListContact(const vector<string> & args) +{ +	dmList(getContact(args.at(0))); +} +  vector<Command> commands = {  	{ "store", store },  	{ "stored-generation", storedGeneration }, @@ -576,6 +609,8 @@ vector<Command> commands = {  	{ "contact-set-name", contactSetName },  	{ "dm-send-peer", dmSendPeer },  	{ "dm-send-contact", dmSendContact }, +	{ "dm-list-peer", dmListPeer }, +	{ "dm-list-contact", dmListContact },  };  } diff --git a/src/message.cpp b/src/message.cpp index 06ee8ad..8ae1601 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -6,42 +6,39 @@  using namespace erebos;  using std::nullopt;  using std::scoped_lock; -using std::unique_lock;  static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); -static vector<DirectMessageThread> threadList; -static mutex threadLock; +DEFINE_SHARED_TYPE(DirectMessageThreads, +		"ee793681-5976-466a-b0f0-4e1907d3fade", +		&DirectMessageThreads::load, +		[](const DirectMessageThreads & threads) { +			return threads.store(); +		}) -DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) -{ -	for (const auto & t : threadList) -		if (t.p->peer.sameAs(peer)) -			return t; -	DirectMessageThread t(new DirectMessageThread::Priv { -		.peer = peer, -		.head = {}, -	}); -	threadList.push_back(t); -	return t; +static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates, +		const Stored<DirectMessageState> & cur, +		const Identity & peer, +		vector<Stored<DirectMessageData>> DirectMessageState::* sel) +{ +	if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty()) +		candidates.push_back(cur); +	else +		for (const auto & p : cur->prev) +			findThreadComponents(candidates, p, peer, sel);  } -DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head) +static vector<Stored<DirectMessageState>> findThreadComponents( +		const vector<Stored<DirectMessageState>> & leaves, +		const Identity & peer, +		vector<Stored<DirectMessageData>> DirectMessageState::* sel)  { -	DirectMessageThread nt(new DirectMessageThread::Priv { -		.peer = peer, -		.head = std::move(head), -	}); - -	for (auto & t : threadList) -		if (t.p->peer.sameAs(peer)) { -			t = nt; -			return nt; -		} - -	threadList.push_back(nt); -	return nt; +	vector<Stored<DirectMessageState>> candidates; +	for (const auto & obj : leaves) +		findThreadComponents(candidates, obj, peer, sel); +	filterAncestors(candidates); +	return candidates;  } @@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const  } +DirectMessageState DirectMessageState::load(const Ref & ref) +{ +	if (auto rec = ref->asRecord()) { +		return DirectMessageState { +			.prev = rec->items("PREV").as<DirectMessageState>(), +			.peer = Identity::load(rec->items("peer").asRef()), + +			.sent = rec->items("sent").as<DirectMessageData>(), +			.received = rec->items("received").as<DirectMessageData>(), +			.seen = rec->items("seen").as<DirectMessageData>(), +		}; +	} + +	return DirectMessageState(); +} + +Ref DirectMessageState::store(const Storage & st) const +{ +	vector<Record::Item> items; + +	for (const auto & prev : prev) +		items.emplace_back("PREV", prev.ref()); +	if (peer) +		for (const auto & ref : peer->refs()) +			items.emplace_back("peer", ref); + +	for (const auto & x : sent) +		items.emplace_back("sent", x.ref()); +	for (const auto & x : received) +		items.emplace_back("received", x.ref()); +	for (const auto & x : seen) +		items.emplace_back("seen", x.ref()); + +	return st.storeObject(Record(std::move(items))); +} + + +DirectMessageThreads::DirectMessageThreads() = default; + +DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s): +	DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) }) +{ +} + +DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s): +	state(move(s)) +{ +} + +DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs) +{ +	DirectMessageThreads res; +	res.state.reserve(refs.size()); +	for (const auto & ref : refs) +		res.state.push_back(Stored<DirectMessageState>::load(ref)); +	return res; +} + +vector<Ref> DirectMessageThreads::store() const +{ +	vector<Ref> refs; +	refs.reserve(state.size()); +	for (const auto & x : state) +		refs.push_back(x.ref()); +	return refs; +} + +vector<Stored<DirectMessageState>> DirectMessageThreads::data() const +{ +	return state; +} + +bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const +{ +	return state == other.state; +} + +bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const +{ +	return state != other.state; +} + +DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const +{ +	vector<Stored<DirectMessageData>> head; +	for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent)) +		for (const auto & m : c->sent) +			head.push_back(m); +	for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received)) +		for (const auto & m : c->received) +			head.push_back(m); +	filterAncestors(head); + +	return new DirectMessageThread::Priv { +		.peer = peer, +		.head = move(head), +	}; +} + +vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads) +{ +	return threads.data(); +} + +  DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w)  {  	watchers.push_back(w); @@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc  DirectMessageService::DirectMessageService(Config && c, const Server & s):  	config(move(c)), -	server(s) +	server(s), +	watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch( +				std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1)))  {}  DirectMessageService::~DirectMessageService() = default; @@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx)  		return;  	auto powner = pid->finalOwner(); -	unique_lock lock(threadLock); - -	vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head); -	head.push_back(Stored<DirectMessageData>::load(ctx.ref())); -	filterAncestors(head); -	auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head)); - -	lock.unlock(); - -	for (const auto & w : config.watchers) -		w(dmt, -1, -1); +	auto msg = Stored<DirectMessageData>::load(ctx.ref()); + +	server.localHead().update([&](const Stored<LocalState> & loc) { +		auto st = loc.ref().storage(); +		auto threads = loc->shared<DirectMessageThreads>(); + +		vector<Stored<DirectMessageData>> receivedOld; +		for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received)) +			for (const auto & m : c->received) +				receivedOld.push_back(m); +		auto receivedNew = receivedOld; +		receivedNew.push_back(msg); +		filterAncestors(receivedNew); + +		if (receivedNew != receivedOld) { +			auto state = st.store(DirectMessageState { +				.prev = threads.data(), +				.peer = powner, +				.sent = {}, +				.received = { msg }, +				.seen = {}, +			}); + +			auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); +			return res; +		} else { +			return loc; +		} +	});  }  DirectMessageThread DirectMessageService::thread(const Identity & peer)  { -	scoped_lock lock(threadLock); -	return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner()); +	return server.localState().get().shared<DirectMessageThreads>().thread(peer);  }  DirectMessage DirectMessageService::send(const Identity & to, const string & text)  { -	scoped_lock lock(threadLock); - -	auto msg = server.localHead().ref().storage().store(DirectMessageData { -		.prev = DirectMessageThread::Priv::getThreadLocked(to).p->head, -		.from = server.identity().finalOwner(), -		.time = ZonedTime::now(), -		.text = text, +	Stored<DirectMessageData> msg; + +	server.localHead().update([&](const Stored<LocalState> & loc) { +		auto st = loc.ref().storage(); + +		auto threads = loc->shared<DirectMessageThreads>(); +		msg = st.store(DirectMessageData { +			.prev = threads.thread(to).p->head, +			.from = server.identity().finalOwner(), +			.time = ZonedTime::now(), +			.text = text, +		}); + +		auto state = st.store(DirectMessageState { +			.prev = threads.data(), +			.peer = to, +			.sent = { msg }, +			.received = {}, +			.seen = {}, +		}); + +		return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));  	}); -	DirectMessageThread::Priv::updateThreadLocked(to, { msg }); -  	if (auto peer = server.peer(to))  		peer->send(myUUID, msg.ref());  	return DirectMessage(new DirectMessage::Priv { -		.data = msg, +		.data = move(msg),  	});  } @@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text)  		return send(id->finalOwner(), text);  	throw std::runtime_error("peer without known identity");  } + +void DirectMessageService::updateHandler(const DirectMessageThreads & threads) +{ +	scoped_lock lock(stateMutex); + +	auto state = prevState; +	for (const auto & s : threads.state) +		state.push_back(s); +	filterAncestors(state); + +	if (state != prevState) { +		auto queue = state; +		vector<Identity> peers; + +		while (not queue.empty()) { +			auto cur = move(queue.back()); +			queue.pop_back(); + +			if (auto peer = cur->peer) { +				bool found = false; +				for (const auto & p : peers) { +					if (p.sameAs(*peer)) { +						found = true; +						break; +					} +				} + +				if (not found) +					peers.push_back(*peer); + +				for (const auto & prev : cur->prev) +					queue.push_back(prev); +			} +		} + +		for (const auto & peer : peers) { +			auto dmt = threads.thread(peer); +			for (const auto & w : config.watchers) +				w(dmt, -1, -1); +		} + +		prevState = move(state); +	} +} diff --git a/src/message.h b/src/message.h index c3c6ba4..4e99cd1 100644 --- a/src/message.h +++ b/src/message.h @@ -37,10 +37,6 @@ struct DirectMessageThread::Priv  {  	const Identity peer;  	const vector<Stored<DirectMessageData>> head; - -	static DirectMessageThread getThreadLocked(const Identity & peer); -	static DirectMessageThread updateThreadLocked(const Identity & peer, -			vector<Stored<DirectMessageData>> && head);  };  struct DirectMessageThread::Iterator::Priv @@ -49,4 +45,17 @@ struct DirectMessageThread::Iterator::Priv  	vector<Stored<DirectMessageData>> next;  }; +struct DirectMessageState +{ +	static DirectMessageState load(const Ref &); +	Ref store(const Storage &) const; + +	vector<Stored<DirectMessageState>> prev; +	optional<Identity> peer; + +	vector<Stored<DirectMessageData>> sent; +	vector<Stored<DirectMessageData>> received; +	vector<Stored<DirectMessageData>> seen; +}; +  } diff --git a/test/message.test b/test/message.test index 3f108d3..e317116 100644 --- a/test/message.test +++ b/test/message.test @@ -14,6 +14,10 @@ test DirectMessage:  		/peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1  		/peer $peer2_1 id Device1 Owner1/ +	with p1: +		send "dm-list-peer $peer1_2" +		expect /dm-list-done/ +  	# Send messages to peers  	for i in [1..2]: @@ -21,14 +25,14 @@ test DirectMessage:  		expect /dm-received from Owner1 text hello$i/ from p2  	for i in [1..2]: -		send "dm-send-peer $peer2_1 hello$i" to p2 -		expect /dm-received from Owner2 text hello$i/ from p1 +		send "dm-send-peer $peer2_1 hi$i" to p2 +		expect /dm-received from Owner2 text hi$i/ from p1  	for i in [3..4]:  		send "dm-send-peer $peer1_2 hello$i" to p1  		expect /dm-received from Owner1 text hello$i/ from p2 -		send "dm-send-peer $peer2_1 hello$i" to p2 -		expect /dm-received from Owner2 text hello$i/ from p1 +		send "dm-send-peer $peer2_1 hi$i" to p2 +		expect /dm-received from Owner2 text hi$i/ from p1  	# Create contacts @@ -56,15 +60,54 @@ test DirectMessage:  	# Send messages to contacts  	for i in [1..2]: -		send "dm-send-contact $c1_2 hello$i" to p1 -		expect /dm-received from Owner1 text hello$i/ from p2 +		send "dm-send-contact $c1_2 hello_c_$i" to p1 +		expect /dm-received from Owner1 text hello_c_$i/ from p2  	for i in [1..2]: -		send "dm-send-contact $c2_1 hello$i" to p2 -		expect /dm-received from Owner2 text hello$i/ from p1 +		send "dm-send-contact $c2_1 hi_c_$i" to p2 +		expect /dm-received from Owner2 text hi_c_$i/ from p1  	for i in [3..4]: -		send "dm-send-contact $c1_2 hello$i" to p1 -		expect /dm-received from Owner1 text hello$i/ from p2 -		send "dm-send-contact $c2_1 hello$i" to p2 -		expect /dm-received from Owner2 text hello$i/ from p1 +		send "dm-send-contact $c1_2 hello_c_$i" to p1 +		expect /dm-received from Owner1 text hello_c_$i/ from p2 +		send "dm-send-contact $c2_1 hi_c_$i" to p2 +		expect /dm-received from Owner2 text hi_c_$i/ from p1 + +	send "dm-list-contact $c1_2" to p1 +	send "dm-list-contact $c2_1" to p2 +	for p in [p1, p2]: +		with p: +			for i in [1..4]: +				expect /dm-list-item from Owner1 text hello_c_$i/ +				expect /dm-list-item from Owner2 text hi_c_$i/ +			for i in [1..4]: +				expect /dm-list-item from Owner1 text hello$i/ +				expect /dm-list-item from Owner2 text hi$i/ +			expect /dm-list-(.*)/ capture done +			guard (done == "done") + +	# Reload message history + +	with p1: +		send "stop-server" +		expect /stop-server-done/ +		send "start-server" + +		send "contact-list" +		expect: +			/contact-list-item $c1_2 Owner2 Owner2/ +			/contact-list-(.*)/ capture done +		guard (done == "done") + +	send "dm-list-contact $c1_2" to p1 +	send "dm-list-contact $c2_1" to p2 +	for p in [p1, p2]: +		with p: +			for i in [1..4]: +				expect /dm-list-item from Owner1 text hello_c_$i/ +				expect /dm-list-item from Owner2 text hi_c_$i/ +			for i in [1..4]: +				expect /dm-list-item from Owner1 text hello$i/ +				expect /dm-list-item from Owner2 text hi$i/ +			expect /dm-list-(.*)/ capture done +			guard (done == "done") |