diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2024-02-03 11:17:13 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-02-03 11:27:59 +0100 | 
| commit | a28d1daed8d89fceaa014a244a506d8092d6ca7c (patch) | |
| tree | fd0209d1e4be68d6b8968d8f5568bcef42edaedd | |
| parent | 33773031d59762a28fbc4673c7df2dd378b79d0e (diff) | |
Message: separate thread for peer synchronization
| -rw-r--r-- | include/erebos/message.h | 17 | ||||
| -rw-r--r-- | src/message.cpp | 58 | ||||
| -rw-r--r-- | src/message.h | 1 | 
3 files changed, 68 insertions, 8 deletions
| diff --git a/include/erebos/message.h b/include/erebos/message.h index a74e6a3..b52b84b 100644 --- a/include/erebos/message.h +++ b/include/erebos/message.h @@ -3,16 +3,21 @@  #include <erebos/merge.h>  #include <erebos/service.h> -#include <chrono> +#include <condition_variable> +#include <deque>  #include <functional>  #include <memory>  #include <mutex>  #include <optional>  #include <string> +#include <tuple>  namespace erebos { +using std::condition_variable; +using std::deque;  using std::mutex; +using std::tuple;  using std::unique_ptr;  class Contact; @@ -142,7 +147,9 @@ public:  private:  	void updateHandler(const DirectMessageThreads &);  	void peerWatcher(size_t, const class Peer *); -	static void syncWithPeer(const Head<LocalState> &, const DirectMessageThread &, const Peer &); +	void syncWithPeer(const DirectMessageThread &, const Peer &); +	void doSyncWithPeers(); +	void doSyncWithPeer(const DirectMessageThread &, const Peer &);  	const Config config;  	const Server & server; @@ -150,6 +157,12 @@ private:  	vector<Stored<DirectMessageState>> prevState;  	mutex stateMutex; +	mutex peerSyncMutex; +	condition_variable peerSyncCond; +	bool peerSyncRun; +	deque<tuple<DirectMessageThread, Peer>> peerSyncQueue; +	std::thread peerSyncThread; +  	Watched<DirectMessageThreads> watched;  }; diff --git a/src/message.cpp b/src/message.cpp index 54c080e..349accb 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -3,9 +3,13 @@  #include <erebos/contact.h>  #include <erebos/network.h> +#include <iostream> +#include <thread> +  using namespace erebos;  using std::nullopt;  using std::scoped_lock; +using std::unique_lock;  static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); @@ -317,9 +321,20 @@ DirectMessageService::DirectMessageService(Config && c, const Server & s):  {  	server.peerList().onUpdate(std::bind(&DirectMessageService::peerWatcher, this,  				std::placeholders::_1, std::placeholders::_2)); + +	peerSyncRun = true; +	peerSyncThread = std::thread(&DirectMessageService::doSyncWithPeers, this);  } -DirectMessageService::~DirectMessageService() = default; +DirectMessageService::~DirectMessageService() +{ +	{ +		scoped_lock lock(peerSyncMutex); +		peerSyncRun = false; +	} +	peerSyncCond.notify_all(); +	peerSyncThread.join(); +}  UUID DirectMessageService::uuid() const  { @@ -469,7 +484,7 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads)  				w(dmt, -1, -1);  			if (auto netPeer = server.peer(peer)) -				syncWithPeer(server.localHead(), dmt, *netPeer); +				syncWithPeer(dmt, *netPeer);  		}  		prevState = move(state); @@ -480,17 +495,48 @@ void DirectMessageService::peerWatcher(size_t, const class Peer * peer)  {  	if (peer) {  		if (auto pid = peer->identity()) { -			syncWithPeer(server.localHead(), -					thread(pid->finalOwner()), *peer); +			syncWithPeer(thread(pid->finalOwner()), *peer); +		} +	} +} + +void DirectMessageService::syncWithPeer(const DirectMessageThread & thread, const Peer & peer) +{ +	{ +		scoped_lock lock(peerSyncMutex); +		peerSyncQueue.emplace_back(thread, peer); +	} +	peerSyncCond.notify_one(); +} + +void DirectMessageService::doSyncWithPeers() +{ +	unique_lock lock(peerSyncMutex); + +	while (peerSyncRun) +	{ +		if (peerSyncQueue.empty()) { +			peerSyncCond.wait(lock); +			continue;  		} + +		auto & [ thread, peer ] = peerSyncQueue.front(); +		lock.unlock(); + +		doSyncWithPeer(thread, peer); + +		lock.lock(); +		peerSyncQueue.pop_front();  	}  } -void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer) +void DirectMessageService::doSyncWithPeer(const DirectMessageThread & thread, const Peer & peer)  {  	for (const auto & msg : thread.p->head) -		peer.send(myUUID, msg.ref()); +		if (not peer.send(myUUID, msg.ref())) +			return; +	const auto & head = server.localHead();  	head.update([&](const Stored<LocalState> & loc) {  		auto st = head.storage(); diff --git a/src/message.h b/src/message.h index f729700..22da0fd 100644 --- a/src/message.h +++ b/src/message.h @@ -5,6 +5,7 @@  #include <erebos/storage.h>  #include <erebos/time.h> +#include <chrono>  #include <mutex>  #include <vector> |