diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/message.cpp | 58 | ||||
| -rw-r--r-- | src/message.h | 1 | 
2 files changed, 53 insertions, 6 deletions
| diff --git a/src/message.cpp b/src/message.cpp index 54c080e..349accb 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -3,9 +3,13 @@  #include <erebos/contact.h>  #include <erebos/network.h> +#include <iostream> +#include <thread> +  using namespace erebos;  using std::nullopt;  using std::scoped_lock; +using std::unique_lock;  static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); @@ -317,9 +321,20 @@ DirectMessageService::DirectMessageService(Config && c, const Server & s):  {  	server.peerList().onUpdate(std::bind(&DirectMessageService::peerWatcher, this,  				std::placeholders::_1, std::placeholders::_2)); + +	peerSyncRun = true; +	peerSyncThread = std::thread(&DirectMessageService::doSyncWithPeers, this);  } -DirectMessageService::~DirectMessageService() = default; +DirectMessageService::~DirectMessageService() +{ +	{ +		scoped_lock lock(peerSyncMutex); +		peerSyncRun = false; +	} +	peerSyncCond.notify_all(); +	peerSyncThread.join(); +}  UUID DirectMessageService::uuid() const  { @@ -469,7 +484,7 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads)  				w(dmt, -1, -1);  			if (auto netPeer = server.peer(peer)) -				syncWithPeer(server.localHead(), dmt, *netPeer); +				syncWithPeer(dmt, *netPeer);  		}  		prevState = move(state); @@ -480,17 +495,48 @@ void DirectMessageService::peerWatcher(size_t, const class Peer * peer)  {  	if (peer) {  		if (auto pid = peer->identity()) { -			syncWithPeer(server.localHead(), -					thread(pid->finalOwner()), *peer); +			syncWithPeer(thread(pid->finalOwner()), *peer); +		} +	} +} + +void DirectMessageService::syncWithPeer(const DirectMessageThread & thread, const Peer & peer) +{ +	{ +		scoped_lock lock(peerSyncMutex); +		peerSyncQueue.emplace_back(thread, peer); +	} +	peerSyncCond.notify_one(); +} + +void DirectMessageService::doSyncWithPeers() +{ +	unique_lock lock(peerSyncMutex); + +	while (peerSyncRun) +	{ +		if (peerSyncQueue.empty()) { +			peerSyncCond.wait(lock); +			continue;  		} + +		auto & [ thread, peer ] = peerSyncQueue.front(); +		lock.unlock(); + +		doSyncWithPeer(thread, peer); + +		lock.lock(); +		peerSyncQueue.pop_front();  	}  } -void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer) +void DirectMessageService::doSyncWithPeer(const DirectMessageThread & thread, const Peer & peer)  {  	for (const auto & msg : thread.p->head) -		peer.send(myUUID, msg.ref()); +		if (not peer.send(myUUID, msg.ref())) +			return; +	const auto & head = server.localHead();  	head.update([&](const Stored<LocalState> & loc) {  		auto st = head.storage(); diff --git a/src/message.h b/src/message.h index f729700..22da0fd 100644 --- a/src/message.h +++ b/src/message.h @@ -5,6 +5,7 @@  #include <erebos/storage.h>  #include <erebos/time.h> +#include <chrono>  #include <mutex>  #include <vector> |