From 56b6a56b5d14781cd24e38860c082bfdab96c918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 7 Jan 2024 20:38:51 +0100 Subject: Message: send messages as sync from shared state --- src/main.cpp | 42 +++++++++++++++++++------------------ src/message.cpp | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/main.cpp b/src/main.cpp index 3100695..a0a9458 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -68,7 +68,7 @@ void printLine(const string & line) } Storage st(getErebosDir()); -optional> h; +optional> testHead; optional server; struct TestPeer @@ -109,7 +109,7 @@ Contact getContact(const string & id) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens().lens>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens().lens>().get().view(cmp)) { if (string(c.leastRoot()) == id) { return c; } @@ -227,14 +227,14 @@ void createIdentity(const vector & args) } if (identity) { - auto nh = h->update([&identity] (const auto & loc) { + auto nh = testHead->update([&identity] (const auto & loc) { auto ret = loc->identity(*identity); if (identity->owner()) ret = ret.template shared>(identity->finalOwner()); return st.store(ret); }); if (nh) - *h = *nh; + *testHead = *nh; } } @@ -309,7 +309,7 @@ void startServer(const vector &) config.service(); - server.emplace(*h, move(config)); + server.emplace(*testHead, move(config)); server->peerList().onUpdate([](size_t idx, const Peer * peer) { size_t i = 0; @@ -364,7 +364,7 @@ void sharedStateGet(const vector &) { ostringstream ss; ss << "shared-state-get"; - for (const auto & r : h->behavior().lens>().get()) + for (const auto & r : testHead->behavior().lens>().get()) ss << " " << string(r.digest()); printLine(ss.str()); } @@ -379,7 +379,7 @@ void sharedStateWait(const vector & args) }; auto watchedPtr = make_shared(); - auto watched = h->behavior().lens>().watch([args, watchedPtr] (const vector & refs) { + auto watched = testHead->behavior().lens>().watch([args, watchedPtr] (const vector & refs) { vector> objs; objs.reserve(refs.size()); for (const auto & r : refs) @@ -413,7 +413,7 @@ void sharedStateWait(const vector & args) void watchLocalIdentity(const vector &) { - auto bhv = h->behavior().lens>(); + auto bhv = testHead->behavior().lens>(); static auto watchedLocalIdentity = bhv.watch([] (const optional & idt) { if (idt) { ostringstream ss; @@ -431,7 +431,7 @@ void watchLocalIdentity(const vector &) void watchSharedIdentity(const vector &) { - auto bhv = h->behavior().lens().lens>(); + auto bhv = testHead->behavior().lens().lens>(); static auto watchedSharedIdentity = bhv.watch([] (const optional & idt) { if (idt) { ostringstream ss; @@ -449,7 +449,7 @@ void updateLocalIdentity(const vector & params) throw invalid_argument("usage: update-local-identity "); } - auto nh = h->update([¶ms] (const Stored & loc) { + auto nh = testHead->update([¶ms] (const Stored & loc) { auto st = loc.ref().storage(); auto b = loc->identity()->modify(); @@ -457,7 +457,7 @@ void updateLocalIdentity(const vector & params) return st.store(loc->identity(b.commit())); }); if (nh) - *h = *nh; + *testHead = *nh; } void updateSharedIdentity(const vector & params) @@ -466,7 +466,7 @@ void updateSharedIdentity(const vector & params) throw invalid_argument("usage: update-shared-identity "); } - auto nh = h->update([¶ms] (const Stored & loc) { + auto nh = testHead->update([¶ms] (const Stored & loc) { auto st = loc.ref().storage(); auto mbid = loc->shared>(); if (!mbid) @@ -477,7 +477,7 @@ void updateSharedIdentity(const vector & params) return st.store(loc->shared>(optional(b.commit()))); }); if (nh) - *h = *nh; + *testHead = *nh; } void attachTo(const vector & params) @@ -515,7 +515,7 @@ void contactList(const vector &) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens().lens>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens().lens>().get().view(cmp)) { ostringstream ss; ss << "contact-list-item " << string(c.leastRoot()) << " " << c.name(); if (auto id = c.identity()) @@ -532,28 +532,30 @@ void contactSetName(const vector & args) auto name = args.at(1); auto c = getContact(id); - auto nh = h->update([&] (const Stored & loc) { + auto nh = testHead->update([&] (const Stored & loc) { auto st = loc.ref().storage(); auto cc = c.customName(st, name); auto contacts = loc->shared>(); return st.store(loc->shared>(contacts.add(st, cc))); }); if (nh) - *h = *nh; + *testHead = *nh; printLine("contact-set-name-done"); } void dmSendPeer(const vector & args) { - server->svc().send( + DirectMessageService::send( + *testHead, getPeer(args.at(0)).peer, args.at(1)); } void dmSendContact(const vector & args) { - server->svc().send( + DirectMessageService::send( + *testHead, getContact(args.at(0)), args.at(1)); } @@ -562,7 +564,7 @@ template static void dmList(const T & peer) { if (auto id = peer.identity()) - for (const auto & msg : h->behavior().get().shared().thread(*id)) { + for (const auto & msg : testHead->behavior().get().shared().thread(*id)) { string name = ""; if (const auto & from = msg.from()) if (const auto & opt = from->name()) @@ -622,7 +624,7 @@ vector commands = { int main(int argc, char * argv[]) { - h.emplace([] { + testHead.emplace([] { auto hs = st.heads(); if (!hs.empty()) return hs[0]; diff --git a/src/message.cpp b/src/message.cpp index 1ee362a..389807b 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -364,11 +364,11 @@ DirectMessageThread DirectMessageService::thread(const Identity & peer) return server.localState().get().shared().thread(peer); } -DirectMessage DirectMessageService::send(const Identity & to, const string & text) +DirectMessage DirectMessageService::send(const Head & head, const Identity & to, const string & text) { Stored msg; - server.localHead().update([&](const Stored & loc) { + head.update([&](const Stored & loc) { auto st = loc.ref().storage(); auto threads = loc->shared(); @@ -388,14 +388,30 @@ DirectMessage DirectMessageService::send(const Identity & to, const string & tex return st.store(loc->shared(DirectMessageThreads(state))); }); - if (auto peer = server.peer(to)) - peer->send(myUUID, msg.ref()); - return DirectMessage(new DirectMessage::Priv { .data = move(msg), }); } +DirectMessage DirectMessageService::send(const Head & head, const Contact & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, *id, text); + throw std::runtime_error("contact without erebos identity"); +} + +DirectMessage DirectMessageService::send(const Head & head, const Peer & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, id->finalOwner(), text); + throw std::runtime_error("peer without known identity"); +} + +DirectMessage DirectMessageService::send(const Identity & to, const string & text) +{ + return send(server.localHead(), to, text); +} + DirectMessage DirectMessageService::send(const Contact & to, const string & text) { if (auto id = to.identity()) @@ -448,8 +464,46 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads) auto dmt = threads.thread(peer); for (const auto & w : config.watchers) w(dmt, -1, -1); + + if (auto netPeer = server.peer(peer)) + syncWithPeer(server.localHead(), dmt, *netPeer); } prevState = move(state); } } + +void DirectMessageService::syncWithPeer(const Head & head, const DirectMessageThread & thread, const Peer & peer) +{ + for (const auto & msg : thread.p->head) + peer.send(myUUID, msg.ref()); + + head.update([&](const Stored & loc) { + auto st = head.storage(); + + auto threads = loc->shared(); + + vector> oldSent; + for (const auto & c : findThreadComponents(threads.data(), thread.peer(), &DirectMessageState::sent)) + for (const auto & m : c->sent) + oldSent.push_back(m); + filterAncestors(oldSent); + + auto newSent = oldSent; + for (const auto & msg : thread.p->head) + newSent.push_back(msg); + filterAncestors(newSent); + + if (newSent != oldSent) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = thread.peer(), + .sent = move(newSent), + }); + + return st.store(loc->shared(DirectMessageThreads(state))); + } + + return loc; + }); +} -- cgit v1.2.3