summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/message.cpp58
-rw-r--r--src/message.h1
2 files changed, 53 insertions, 6 deletions
diff --git a/src/message.cpp b/src/message.cpp
index 54c080e..349accb 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -3,9 +3,13 @@
#include <erebos/contact.h>
#include <erebos/network.h>
+#include <iostream>
+#include <thread>
+
using namespace erebos;
using std::nullopt;
using std::scoped_lock;
+using std::unique_lock;
static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
@@ -317,9 +321,20 @@ DirectMessageService::DirectMessageService(Config && c, const Server & s):
{
server.peerList().onUpdate(std::bind(&DirectMessageService::peerWatcher, this,
std::placeholders::_1, std::placeholders::_2));
+
+ peerSyncRun = true;
+ peerSyncThread = std::thread(&DirectMessageService::doSyncWithPeers, this);
}
-DirectMessageService::~DirectMessageService() = default;
+DirectMessageService::~DirectMessageService()
+{
+ {
+ scoped_lock lock(peerSyncMutex);
+ peerSyncRun = false;
+ }
+ peerSyncCond.notify_all();
+ peerSyncThread.join();
+}
UUID DirectMessageService::uuid() const
{
@@ -469,7 +484,7 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads)
w(dmt, -1, -1);
if (auto netPeer = server.peer(peer))
- syncWithPeer(server.localHead(), dmt, *netPeer);
+ syncWithPeer(dmt, *netPeer);
}
prevState = move(state);
@@ -480,17 +495,48 @@ void DirectMessageService::peerWatcher(size_t, const class Peer * peer)
{
if (peer) {
if (auto pid = peer->identity()) {
- syncWithPeer(server.localHead(),
- thread(pid->finalOwner()), *peer);
+ syncWithPeer(thread(pid->finalOwner()), *peer);
+ }
+ }
+}
+
+void DirectMessageService::syncWithPeer(const DirectMessageThread & thread, const Peer & peer)
+{
+ {
+ scoped_lock lock(peerSyncMutex);
+ peerSyncQueue.emplace_back(thread, peer);
+ }
+ peerSyncCond.notify_one();
+}
+
+void DirectMessageService::doSyncWithPeers()
+{
+ unique_lock lock(peerSyncMutex);
+
+ while (peerSyncRun)
+ {
+ if (peerSyncQueue.empty()) {
+ peerSyncCond.wait(lock);
+ continue;
}
+
+ auto & [ thread, peer ] = peerSyncQueue.front();
+ lock.unlock();
+
+ doSyncWithPeer(thread, peer);
+
+ lock.lock();
+ peerSyncQueue.pop_front();
}
}
-void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer)
+void DirectMessageService::doSyncWithPeer(const DirectMessageThread & thread, const Peer & peer)
{
for (const auto & msg : thread.p->head)
- peer.send(myUUID, msg.ref());
+ if (not peer.send(myUUID, msg.ref()))
+ return;
+ const auto & head = server.localHead();
head.update([&](const Stored<LocalState> & loc) {
auto st = head.storage();
diff --git a/src/message.h b/src/message.h
index f729700..22da0fd 100644
--- a/src/message.h
+++ b/src/message.h
@@ -5,6 +5,7 @@
#include <erebos/storage.h>
#include <erebos/time.h>
+#include <chrono>
#include <mutex>
#include <vector>