diff options
Diffstat (limited to 'src/message.cpp')
-rw-r--r-- | src/message.cpp | 282 |
1 files changed, 230 insertions, 52 deletions
diff --git a/src/message.cpp b/src/message.cpp index 06ee8ad..8ae1601 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -6,42 +6,39 @@ using namespace erebos; using std::nullopt; using std::scoped_lock; -using std::unique_lock; static const UUID myUUID("c702076c-4928-4415-8b6b-3e839eafcb0d"); -static vector<DirectMessageThread> threadList; -static mutex threadLock; +DEFINE_SHARED_TYPE(DirectMessageThreads, + "ee793681-5976-466a-b0f0-4e1907d3fade", + &DirectMessageThreads::load, + [](const DirectMessageThreads & threads) { + return threads.store(); + }) -DirectMessageThread DirectMessageThread::Priv::getThreadLocked(const Identity & peer) -{ - for (const auto & t : threadList) - if (t.p->peer.sameAs(peer)) - return t; - DirectMessageThread t(new DirectMessageThread::Priv { - .peer = peer, - .head = {}, - }); - threadList.push_back(t); - return t; +static void findThreadComponents(vector<Stored<DirectMessageState>> & candidates, + const Stored<DirectMessageState> & cur, + const Identity & peer, + vector<Stored<DirectMessageData>> DirectMessageState::* sel) +{ + if (cur->peer && cur->peer->sameAs(peer) && not ((*cur).*sel).empty()) + candidates.push_back(cur); + else + for (const auto & p : cur->prev) + findThreadComponents(candidates, p, peer, sel); } -DirectMessageThread DirectMessageThread::Priv::updateThreadLocked(const Identity & peer, vector<Stored<DirectMessageData>> && head) +static vector<Stored<DirectMessageState>> findThreadComponents( + const vector<Stored<DirectMessageState>> & leaves, + const Identity & peer, + vector<Stored<DirectMessageData>> DirectMessageState::* sel) { - DirectMessageThread nt(new DirectMessageThread::Priv { - .peer = peer, - .head = std::move(head), - }); - - for (auto & t : threadList) - if (t.p->peer.sameAs(peer)) { - t = nt; - return nt; - } - - threadList.push_back(nt); - return nt; + vector<Stored<DirectMessageState>> candidates; + for (const auto & obj : leaves) + findThreadComponents(candidates, obj, peer, sel); + filterAncestors(candidates); + return candidates; } @@ -198,6 +195,111 @@ const Identity & DirectMessageThread::peer() const } +DirectMessageState DirectMessageState::load(const Ref & ref) +{ + if (auto rec = ref->asRecord()) { + return DirectMessageState { + .prev = rec->items("PREV").as<DirectMessageState>(), + .peer = Identity::load(rec->items("peer").asRef()), + + .sent = rec->items("sent").as<DirectMessageData>(), + .received = rec->items("received").as<DirectMessageData>(), + .seen = rec->items("seen").as<DirectMessageData>(), + }; + } + + return DirectMessageState(); +} + +Ref DirectMessageState::store(const Storage & st) const +{ + vector<Record::Item> items; + + for (const auto & prev : prev) + items.emplace_back("PREV", prev.ref()); + if (peer) + for (const auto & ref : peer->refs()) + items.emplace_back("peer", ref); + + for (const auto & x : sent) + items.emplace_back("sent", x.ref()); + for (const auto & x : received) + items.emplace_back("received", x.ref()); + for (const auto & x : seen) + items.emplace_back("seen", x.ref()); + + return st.storeObject(Record(std::move(items))); +} + + +DirectMessageThreads::DirectMessageThreads() = default; + +DirectMessageThreads::DirectMessageThreads(Stored<DirectMessageState> s): + DirectMessageThreads(vector<Stored<DirectMessageState>> { move(s) }) +{ +} + +DirectMessageThreads::DirectMessageThreads(vector<Stored<DirectMessageState>> s): + state(move(s)) +{ +} + +DirectMessageThreads DirectMessageThreads::load(const vector<Ref> & refs) +{ + DirectMessageThreads res; + res.state.reserve(refs.size()); + for (const auto & ref : refs) + res.state.push_back(Stored<DirectMessageState>::load(ref)); + return res; +} + +vector<Ref> DirectMessageThreads::store() const +{ + vector<Ref> refs; + refs.reserve(state.size()); + for (const auto & x : state) + refs.push_back(x.ref()); + return refs; +} + +vector<Stored<DirectMessageState>> DirectMessageThreads::data() const +{ + return state; +} + +bool DirectMessageThreads::operator==(const DirectMessageThreads & other) const +{ + return state == other.state; +} + +bool DirectMessageThreads::operator!=(const DirectMessageThreads & other) const +{ + return state != other.state; +} + +DirectMessageThread DirectMessageThreads::thread(const Identity & peer) const +{ + vector<Stored<DirectMessageData>> head; + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::sent)) + for (const auto & m : c->sent) + head.push_back(m); + for (const auto & c : findThreadComponents(state, peer, &DirectMessageState::received)) + for (const auto & m : c->received) + head.push_back(m); + filterAncestors(head); + + return new DirectMessageThread::Priv { + .peer = peer, + .head = move(head), + }; +} + +vector<Stored<DirectMessageState>> Mergeable<DirectMessageThreads>::components(const DirectMessageThreads & threads) +{ + return threads.data(); +} + + DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatcher w) { watchers.push_back(w); @@ -206,7 +308,9 @@ DirectMessageService::Config & DirectMessageService::Config::onUpdate(ThreadWatc DirectMessageService::DirectMessageService(Config && c, const Server & s): config(move(c)), - server(s) + server(s), + watched(server.localState().lens<SharedState>().lens<DirectMessageThreads>().watch( + std::bind(&DirectMessageService::updateHandler, this, std::placeholders::_1))) {} DirectMessageService::~DirectMessageService() = default; @@ -223,43 +327,73 @@ void DirectMessageService::handle(Context & ctx) return; auto powner = pid->finalOwner(); - unique_lock lock(threadLock); - - vector<Stored<DirectMessageData>> head(DirectMessageThread::Priv::getThreadLocked(powner).p->head); - head.push_back(Stored<DirectMessageData>::load(ctx.ref())); - filterAncestors(head); - auto dmt = DirectMessageThread::Priv::updateThreadLocked(powner, std::move(head)); - - lock.unlock(); - - for (const auto & w : config.watchers) - w(dmt, -1, -1); + auto msg = Stored<DirectMessageData>::load(ctx.ref()); + + server.localHead().update([&](const Stored<LocalState> & loc) { + auto st = loc.ref().storage(); + auto threads = loc->shared<DirectMessageThreads>(); + + vector<Stored<DirectMessageData>> receivedOld; + for (const auto & c : findThreadComponents(threads.state, powner, &DirectMessageState::received)) + for (const auto & m : c->received) + receivedOld.push_back(m); + auto receivedNew = receivedOld; + receivedNew.push_back(msg); + filterAncestors(receivedNew); + + if (receivedNew != receivedOld) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = powner, + .sent = {}, + .received = { msg }, + .seen = {}, + }); + + auto res = st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); + return res; + } else { + return loc; + } + }); } DirectMessageThread DirectMessageService::thread(const Identity & peer) { - scoped_lock lock(threadLock); - return DirectMessageThread::Priv::getThreadLocked(peer.finalOwner()); + return server.localState().get().shared<DirectMessageThreads>().thread(peer); } DirectMessage DirectMessageService::send(const Identity & to, const string & text) { - scoped_lock lock(threadLock); - - auto msg = server.localHead().ref().storage().store(DirectMessageData { - .prev = DirectMessageThread::Priv::getThreadLocked(to).p->head, - .from = server.identity().finalOwner(), - .time = ZonedTime::now(), - .text = text, + Stored<DirectMessageData> msg; + + server.localHead().update([&](const Stored<LocalState> & loc) { + auto st = loc.ref().storage(); + + auto threads = loc->shared<DirectMessageThreads>(); + msg = st.store(DirectMessageData { + .prev = threads.thread(to).p->head, + .from = server.identity().finalOwner(), + .time = ZonedTime::now(), + .text = text, + }); + + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = to, + .sent = { msg }, + .received = {}, + .seen = {}, + }); + + return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); }); - DirectMessageThread::Priv::updateThreadLocked(to, { msg }); - if (auto peer = server.peer(to)) peer->send(myUUID, msg.ref()); return DirectMessage(new DirectMessage::Priv { - .data = msg, + .data = move(msg), }); } @@ -276,3 +410,47 @@ DirectMessage DirectMessageService::send(const Peer & to, const string & text) return send(id->finalOwner(), text); throw std::runtime_error("peer without known identity"); } + +void DirectMessageService::updateHandler(const DirectMessageThreads & threads) +{ + scoped_lock lock(stateMutex); + + auto state = prevState; + for (const auto & s : threads.state) + state.push_back(s); + filterAncestors(state); + + if (state != prevState) { + auto queue = state; + vector<Identity> peers; + + while (not queue.empty()) { + auto cur = move(queue.back()); + queue.pop_back(); + + if (auto peer = cur->peer) { + bool found = false; + for (const auto & p : peers) { + if (p.sameAs(*peer)) { + found = true; + break; + } + } + + if (not found) + peers.push_back(*peer); + + for (const auto & prev : cur->prev) + queue.push_back(prev); + } + } + + for (const auto & peer : peers) { + auto dmt = threads.thread(peer); + for (const auto & w : config.watchers) + w(dmt, -1, -1); + } + + prevState = move(state); + } +} |