diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2023-07-01 20:08:48 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-07-01 22:07:27 +0200 | 
| commit | 9577599e6af4dfb36df81fe5d89801c4ce4a19ab (patch) | |
| tree | 1d383f6f3466c3c2198b4de2a2f53b6ad500fe39 /src | |
| parent | afab7dc5673fbc5fd600182612626676ae36d1c0 (diff) | |
Direct message shared state
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.cpp | 35 | ||||
| -rw-r--r-- | src/message.cpp | 282 | ||||
| -rw-r--r-- | src/message.h | 17 | 
3 files changed, 278 insertions, 56 deletions
| diff --git a/src/main.cpp b/src/main.cpp index f9c5e98..b39130a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -289,6 +289,9 @@ void startServer(const vector<string> &)  	config.service<DirectMessageService>()  		.onUpdate([](const DirectMessageThread & thread, ssize_t, ssize_t) { +			if (thread.at(0).from()->sameAs(server->identity())) +				return; +  			ostringstream ss;  			string name = "<unnamed>"; @@ -550,6 +553,36 @@ void dmSendContact(const vector<string> & args)  			args.at(1));  } +template<class T> +static void dmList(const T & peer) +{ +	if (auto id = peer.identity()) +		for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) { +			string name = "<unnamed>"; +			if (const auto & from = msg.from()) +				if (const auto & opt = from->name()) +					name = *opt; + +			ostringstream ss; +			ss << "dm-list-item" +				<< " from " << name +				<< " text " << msg.text() +				; +			printLine(ss.str()); +		} +	printLine("dm-list-done"); +} + +void dmListPeer(const vector<string> & args) +{ +	dmList(getPeer(args.at(0)).peer); +} + +void dmListContact(const vector<string> & args) +{ +	dmList(getContact(args.at(0))); +} +  vector<Command> commands = {  	{ "store", store },  	{ "stored-generation", storedGeneration }, @@ -576,6 +609,8 @@ vector<Command> commands = {  	{ "contact-set-name", contactSetName },  	{ "dm-send-peer", dmSendPeer },  	{ "dm-send-contact", dmSendContact }, +	{ "dm-list-peer", dmListPeer }, +	{ "dm-list-contact", dmListContact },  };  } diff --git a/src/message.cpp b/src/message.cpp index 06ee8ad..8ae1601 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -6,42 +6,39 @@  using namespace erebos;  using std::nullopt;  using std::scoped_lock; -using std::unique_lock;  static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); -static vector<DirectMessageThread> threadList; -static mutex threadLock; +DEFINE_SHARED_TYPE(DirectMessageThreads, +		"ee793681-5976-466a-b0f0-4e1907d3fade", +		&DirectMessageThreads::load, +		[](const DirectMessageThreads & threads) { +			return threads.store(); +		}) -DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) -{ -	for (const auto & t : threadList) -		if (t.p->peer.sameAs(peer)) -			return t; -	DirectMessageThread t(new DirectMessageThread::Priv { -		.peer = peer, -		.head = {}, -	}); -	threadList.push_back(t); -	return t; +static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates, +		const Stored<DirectMessageState> & cur, +		const Identity & peer, +		vector<Stored<DirectMessageData>> DirectMessageState::* sel) +{ +	if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty()) +		candidates.push_back(cur); +	else +		for (const auto & p : cur->prev) +			findThreadComponents(candidates, p, peer, sel);  } -DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head) +static vector<Stored<DirectMessageState>> findThreadComponents( +		const vector<Stored<DirectMessageState>> & leaves, +		const Identity & peer, +		vector<Stored<DirectMessageData>> DirectMessageState::* sel)  { -	DirectMessageThread nt(new DirectMessageThread::Priv { -		.peer = peer, -		.head = std::move(head), -	}); - -	for (auto & t : threadList) -		if (t.p->peer.sameAs(peer)) { -			t = nt; -			return nt; -		} - -	threadList.push_back(nt); -	return nt; +	vector<Stored<DirectMessageState>> candidates; +	for (const auto & obj : leaves) +		findThreadComponents(candidates, obj, peer, sel); +	filterAncestors(candidates); +	return candidates;  } @@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const  } +DirectMessageState DirectMessageState::load(const Ref & ref) +{ +	if (auto rec = ref->asRecord()) { +		return DirectMessageState { +			.prev = rec->items("PREV").as<DirectMessageState>(), +			.peer = Identity::load(rec->items("peer").asRef()), + +			.sent = rec->items("sent").as<DirectMessageData>(), +			.received = rec->items("received").as<DirectMessageData>(), +			.seen = rec->items("seen").as<DirectMessageData>(), +		}; +	} + +	return DirectMessageState(); +} + +Ref DirectMessageState::store(const Storage & st) const +{ +	vector<Record::Item> items; + +	for (const auto & prev : prev) +		items.emplace_back("PREV", prev.ref()); +	if (peer) +		for (const auto & ref : peer->refs()) +			items.emplace_back("peer", ref); + +	for (const auto & x : sent) +		items.emplace_back("sent", x.ref()); +	for (const auto & x : received) +		items.emplace_back("received", x.ref()); +	for (const auto & x : seen) +		items.emplace_back("seen", x.ref()); + +	return st.storeObject(Record(std::move(items))); +} + + +DirectMessageThreads::DirectMessageThreads() = default; + +DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s): +	DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) }) +{ +} + +DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s): +	state(move(s)) +{ +} + +DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs) +{ +	DirectMessageThreads res; +	res.state.reserve(refs.size()); +	for (const auto & ref : refs) +		res.state.push_back(Stored<DirectMessageState>::load(ref)); +	return res; +} + +vector<Ref> DirectMessageThreads::store() const +{ +	vector<Ref> refs; +	refs.reserve(state.size()); +	for (const auto & x : state) +		refs.push_back(x.ref()); +	return refs; +} + +vector<Stored<DirectMessageState>> DirectMessageThreads::data() const +{ +	return state; +} + +bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const +{ +	return state == other.state; +} + +bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const +{ +	return state != other.state; +} + +DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const +{ +	vector<Stored<DirectMessageData>> head; +	for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent)) +		for (const auto & m : c->sent) +			head.push_back(m); +	for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received)) +		for (const auto & m : c->received) +			head.push_back(m); +	filterAncestors(head); + +	return new DirectMessageThread::Priv { +		.peer = peer, +		.head = move(head), +	}; +} + +vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads) +{ +	return threads.data(); +} + +  DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w)  {  	watchers.push_back(w); @@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc  DirectMessageService::DirectMessageService(Config && c, const Server & s):  	config(move(c)), -	server(s) +	server(s), +	watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch( +				std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1)))  {}  DirectMessageService::~DirectMessageService() = default; @@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx)  		return;  	auto powner = pid->finalOwner(); -	unique_lock lock(threadLock); - -	vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head); -	head.push_back(Stored<DirectMessageData>::load(ctx.ref())); -	filterAncestors(head); -	auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head)); - -	lock.unlock(); - -	for (const auto & w : config.watchers) -		w(dmt, -1, -1); +	auto msg = Stored<DirectMessageData>::load(ctx.ref()); + +	server.localHead().update([&](const Stored<LocalState> & loc) { +		auto st = loc.ref().storage(); +		auto threads = loc->shared<DirectMessageThreads>(); + +		vector<Stored<DirectMessageData>> receivedOld; +		for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received)) +			for (const auto & m : c->received) +				receivedOld.push_back(m); +		auto receivedNew = receivedOld; +		receivedNew.push_back(msg); +		filterAncestors(receivedNew); + +		if (receivedNew != receivedOld) { +			auto state = st.store(DirectMessageState { +				.prev = threads.data(), +				.peer = powner, +				.sent = {}, +				.received = { msg }, +				.seen = {}, +			}); + +			auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); +			return res; +		} else { +			return loc; +		} +	});  }  DirectMessageThread DirectMessageService::thread(const Identity & peer)  { -	scoped_lock lock(threadLock); -	return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner()); +	return server.localState().get().shared<DirectMessageThreads>().thread(peer);  }  DirectMessage DirectMessageService::send(const Identity & to, const string & text)  { -	scoped_lock lock(threadLock); - -	auto msg = server.localHead().ref().storage().store(DirectMessageData { -		.prev = DirectMessageThread::Priv::getThreadLocked(to).p->head, -		.from = server.identity().finalOwner(), -		.time = ZonedTime::now(), -		.text = text, +	Stored<DirectMessageData> msg; + +	server.localHead().update([&](const Stored<LocalState> & loc) { +		auto st = loc.ref().storage(); + +		auto threads = loc->shared<DirectMessageThreads>(); +		msg = st.store(DirectMessageData { +			.prev = threads.thread(to).p->head, +			.from = server.identity().finalOwner(), +			.time = ZonedTime::now(), +			.text = text, +		}); + +		auto state = st.store(DirectMessageState { +			.prev = threads.data(), +			.peer = to, +			.sent = { msg }, +			.received = {}, +			.seen = {}, +		}); + +		return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));  	}); -	DirectMessageThread::Priv::updateThreadLocked(to, { msg }); -  	if (auto peer = server.peer(to))  		peer->send(myUUID, msg.ref());  	return DirectMessage(new DirectMessage::Priv { -		.data = msg, +		.data = move(msg),  	});  } @@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text)  		return send(id->finalOwner(), text);  	throw std::runtime_error("peer without known identity");  } + +void DirectMessageService::updateHandler(const DirectMessageThreads & threads) +{ +	scoped_lock lock(stateMutex); + +	auto state = prevState; +	for (const auto & s : threads.state) +		state.push_back(s); +	filterAncestors(state); + +	if (state != prevState) { +		auto queue = state; +		vector<Identity> peers; + +		while (not queue.empty()) { +			auto cur = move(queue.back()); +			queue.pop_back(); + +			if (auto peer = cur->peer) { +				bool found = false; +				for (const auto & p : peers) { +					if (p.sameAs(*peer)) { +						found = true; +						break; +					} +				} + +				if (not found) +					peers.push_back(*peer); + +				for (const auto & prev : cur->prev) +					queue.push_back(prev); +			} +		} + +		for (const auto & peer : peers) { +			auto dmt = threads.thread(peer); +			for (const auto & w : config.watchers) +				w(dmt, -1, -1); +		} + +		prevState = move(state); +	} +} diff --git a/src/message.h b/src/message.h index c3c6ba4..4e99cd1 100644 --- a/src/message.h +++ b/src/message.h @@ -37,10 +37,6 @@ struct DirectMessageThread::Priv  {  	const Identity peer;  	const vector<Stored<DirectMessageData>> head; - -	static DirectMessageThread getThreadLocked(const Identity & peer); -	static DirectMessageThread updateThreadLocked(const Identity & peer, -			vector<Stored<DirectMessageData>> && head);  };  struct DirectMessageThread::Iterator::Priv @@ -49,4 +45,17 @@ struct DirectMessageThread::Iterator::Priv  	vector<Stored<DirectMessageData>> next;  }; +struct DirectMessageState +{ +	static DirectMessageState load(const Ref &); +	Ref store(const Storage &) const; + +	vector<Stored<DirectMessageState>> prev; +	optional<Identity> peer; + +	vector<Stored<DirectMessageData>> sent; +	vector<Stored<DirectMessageData>> received; +	vector<Stored<DirectMessageData>> seen; +}; +  } |