summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-02-03 11:17:13 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2024-02-03 11:27:59 +0100
commita28d1daed8d89fceaa014a244a506d8092d6ca7c (patch)
treefd0209d1e4be68d6b8968d8f5568bcef42edaedd
parent33773031d59762a28fbc4673c7df2dd378b79d0e (diff)
Message: separate thread for peer synchronization
-rw-r--r--include/erebos/message.h17
-rw-r--r--src/message.cpp58
-rw-r--r--src/message.h1
3 files changed, 68 insertions, 8 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h
index a74e6a3..b52b84b 100644
--- a/include/erebos/message.h
+++ b/include/erebos/message.h
@@ -3,16 +3,21 @@
#include <erebos/merge.h>
#include <erebos/service.h>
-#include <chrono>
+#include <condition_variable>
+#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
+#include <tuple>
namespace erebos {
+using std::condition_variable;
+using std::deque;
using std::mutex;
+using std::tuple;
using std::unique_ptr;
class Contact;
@@ -142,7 +147,9 @@ public:
private:
void updateHandler(const DirectMessageThreads &);
void peerWatcher(size_t, const class Peer *);
- static void syncWithPeer(const Head<LocalState> &, const DirectMessageThread &, const Peer &);
+ void syncWithPeer(const DirectMessageThread &, const Peer &);
+ void doSyncWithPeers();
+ void doSyncWithPeer(const DirectMessageThread &, const Peer &);
const Config config;
const Server & server;
@@ -150,6 +157,12 @@ private:
vector<Stored<DirectMessageState>> prevState;
mutex stateMutex;
+ mutex peerSyncMutex;
+ condition_variable peerSyncCond;
+ bool peerSyncRun;
+ deque<tuple<DirectMessageThread, Peer>> peerSyncQueue;
+ std::thread peerSyncThread;
+
Watched<DirectMessageThreads> watched;
};
diff --git a/src/message.cpp b/src/message.cpp
index 54c080e..349accb 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -3,9 +3,13 @@
#include <erebos/contact.h>
#include <erebos/network.h>
+#include <iostream>
+#include <thread>
+
using namespace erebos;
using std::nullopt;
using std::scoped_lock;
+using std::unique_lock;
static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d");
@@ -317,9 +321,20 @@ DirectMessageService::DirectMessageService(Config && c, const Server & s):
{
server.peerList().onUpdate(std::bind(&DirectMessageService::peerWatcher, this,
std::placeholders::_1, std::placeholders::_2));
+
+ peerSyncRun = true;
+ peerSyncThread = std::thread(&DirectMessageService::doSyncWithPeers, this);
}
-DirectMessageService::~DirectMessageService() = default;
+DirectMessageService::~DirectMessageService()
+{
+ {
+ scoped_lock lock(peerSyncMutex);
+ peerSyncRun = false;
+ }
+ peerSyncCond.notify_all();
+ peerSyncThread.join();
+}
UUID DirectMessageService::uuid() const
{
@@ -469,7 +484,7 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads)
w(dmt, -1, -1);
if (auto netPeer = server.peer(peer))
- syncWithPeer(server.localHead(), dmt, *netPeer);
+ syncWithPeer(dmt, *netPeer);
}
prevState = move(state);
@@ -480,17 +495,48 @@ void DirectMessageService::peerWatcher(size_t, const class Peer * peer)
{
if (peer) {
if (auto pid = peer->identity()) {
- syncWithPeer(server.localHead(),
- thread(pid->finalOwner()), *peer);
+ syncWithPeer(thread(pid->finalOwner()), *peer);
+ }
+ }
+}
+
+void DirectMessageService::syncWithPeer(const DirectMessageThread & thread, const Peer & peer)
+{
+ {
+ scoped_lock lock(peerSyncMutex);
+ peerSyncQueue.emplace_back(thread, peer);
+ }
+ peerSyncCond.notify_one();
+}
+
+void DirectMessageService::doSyncWithPeers()
+{
+ unique_lock lock(peerSyncMutex);
+
+ while (peerSyncRun)
+ {
+ if (peerSyncQueue.empty()) {
+ peerSyncCond.wait(lock);
+ continue;
}
+
+ auto & [ thread, peer ] = peerSyncQueue.front();
+ lock.unlock();
+
+ doSyncWithPeer(thread, peer);
+
+ lock.lock();
+ peerSyncQueue.pop_front();
}
}
-void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer)
+void DirectMessageService::doSyncWithPeer(const DirectMessageThread & thread, const Peer & peer)
{
for (const auto & msg : thread.p->head)
- peer.send(myUUID, msg.ref());
+ if (not peer.send(myUUID, msg.ref()))
+ return;
+ const auto & head = server.localHead();
head.update([&](const Stored<LocalState> & loc) {
auto st = head.storage();
diff --git a/src/message.h b/src/message.h
index f729700..22da0fd 100644
--- a/src/message.h
+++ b/src/message.h
@@ -5,6 +5,7 @@
#include <erebos/storage.h>
#include <erebos/time.h>
+#include <chrono>
#include <mutex>
#include <vector>