diff options
-rw-r--r-- | include/erebos/message.h | 17 | ||||
-rw-r--r-- | src/message.cpp | 58 | ||||
-rw-r--r-- | src/message.h | 1 |
3 files changed, 68 insertions, 8 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h index a74e6a3..b52b84b 100644 --- a/include/erebos/message.h +++ b/include/erebos/message.h @@ -3,16 +3,21 @@ #include <erebos/merge.h> #include <erebos/service.h> -#include <chrono> +#include <condition_variable> +#include <deque> #include <functional> #include <memory> #include <mutex> #include <optional> #include <string> +#include <tuple> namespace erebos { +using std::condition_variable; +using std::deque; using std::mutex; +using std::tuple; using std::unique_ptr; class Contact; @@ -142,7 +147,9 @@ public: private: void updateHandler(const DirectMessageThreads &); void peerWatcher(size_t, const class Peer *); - static void syncWithPeer(const Head<LocalState> &, const DirectMessageThread &, const Peer &); + void syncWithPeer(const DirectMessageThread &, const Peer &); + void doSyncWithPeers(); + void doSyncWithPeer(const DirectMessageThread &, const Peer &); const Config config; const Server & server; @@ -150,6 +157,12 @@ private: vector<Stored<DirectMessageState>> prevState; mutex stateMutex; + mutex peerSyncMutex; + condition_variable peerSyncCond; + bool peerSyncRun; + deque<tuple<DirectMessageThread, Peer>> peerSyncQueue; + std::thread peerSyncThread; + Watched<DirectMessageThreads> watched; }; diff --git a/src/message.cpp b/src/message.cpp index 54c080e..349accb 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -3,9 +3,13 @@ #include <erebos/contact.h> #include <erebos/network.h> +#include <iostream> +#include <thread> + using namespace erebos; using std::nullopt; using std::scoped_lock; +using std::unique_lock; static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); @@ -317,9 +321,20 @@ DirectMessageService::DirectMessageService(Config && c, const Server & s): { server.peerList().onUpdate(std::bind(&DirectMessageService::peerWatcher, this, std::placeholders::_1, std::placeholders::_2)); + + peerSyncRun = true; + peerSyncThread = std::thread(&DirectMessageService::doSyncWithPeers, this); } -DirectMessageService::~DirectMessageService() = default; +DirectMessageService::~DirectMessageService() +{ + { + scoped_lock lock(peerSyncMutex); + peerSyncRun = false; + } + peerSyncCond.notify_all(); + peerSyncThread.join(); +} UUID DirectMessageService::uuid() const { @@ -469,7 +484,7 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads) w(dmt, -1, -1); if (auto netPeer = server.peer(peer)) - syncWithPeer(server.localHead(), dmt, *netPeer); + syncWithPeer(dmt, *netPeer); } prevState = move(state); @@ -480,17 +495,48 @@ void DirectMessageService::peerWatcher(size_t, const class Peer * peer) { if (peer) { if (auto pid = peer->identity()) { - syncWithPeer(server.localHead(), - thread(pid->finalOwner()), *peer); + syncWithPeer(thread(pid->finalOwner()), *peer); + } + } +} + +void DirectMessageService::syncWithPeer(const DirectMessageThread & thread, const Peer & peer) +{ + { + scoped_lock lock(peerSyncMutex); + peerSyncQueue.emplace_back(thread, peer); + } + peerSyncCond.notify_one(); +} + +void DirectMessageService::doSyncWithPeers() +{ + unique_lock lock(peerSyncMutex); + + while (peerSyncRun) + { + if (peerSyncQueue.empty()) { + peerSyncCond.wait(lock); + continue; } + + auto & [ thread, peer ] = peerSyncQueue.front(); + lock.unlock(); + + doSyncWithPeer(thread, peer); + + lock.lock(); + peerSyncQueue.pop_front(); } } -void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer) +void DirectMessageService::doSyncWithPeer(const DirectMessageThread & thread, const Peer & peer) { for (const auto & msg : thread.p->head) - peer.send(myUUID, msg.ref()); + if (not peer.send(myUUID, msg.ref())) + return; + const auto & head = server.localHead(); head.update([&](const Stored<LocalState> & loc) { auto st = head.storage(); diff --git a/src/message.h b/src/message.h index f729700..22da0fd 100644 --- a/src/message.h +++ b/src/message.h @@ -5,6 +5,7 @@ #include <erebos/storage.h> #include <erebos/time.h> +#include <chrono> #include <mutex> #include <vector> |