diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-01-07 20:38:51 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-01-08 22:15:21 +0100 |
commit | 56b6a56b5d14781cd24e38860c082bfdab96c918 (patch) | |
tree | fa8fc33960d19d8ef2bdce57db6f6697f70c8dc3 | |
parent | 6e6836e7885259b731651ae172bd6313edae7cdd (diff) |
Message: send messages as sync from shared state
-rw-r--r-- | include/erebos/message.h | 5 | ||||
-rw-r--r-- | src/main.cpp | 42 | ||||
-rw-r--r-- | src/message.cpp | 64 | ||||
-rw-r--r-- | test/message.test | 29 |
4 files changed, 111 insertions, 29 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h index 194044c..6ad6e54 100644 --- a/include/erebos/message.h +++ b/include/erebos/message.h @@ -131,12 +131,17 @@ public: DirectMessageThread thread(const Identity &); + static DirectMessage send(const Head<LocalState> &, const Identity &, const std::string &); + static DirectMessage send(const Head<LocalState> &, const Contact &, const std::string &); + static DirectMessage send(const Head<LocalState> &, const Peer &, const std::string &); + DirectMessage send(const Identity &, const std::string &); DirectMessage send(const Contact &, const std::string &); DirectMessage send(const Peer &, const std::string &); private: void updateHandler(const DirectMessageThreads &); + static void syncWithPeer(const Head<LocalState> &, const DirectMessageThread &, const Peer &); const Config config; const Server & server; diff --git a/src/main.cpp b/src/main.cpp index 3100695..a0a9458 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -68,7 +68,7 @@ void printLine(const string & line) } Storage st(getErebosDir()); -optional<Head<LocalState>> h; +optional<Head<LocalState>> testHead; optional<Server> server; struct TestPeer @@ -109,7 +109,7 @@ Contact getContact(const string & id) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { if (string(c.leastRoot()) == id) { return c; } @@ -227,14 +227,14 @@ void createIdentity(const vector<string> & args) } if (identity) { - auto nh = h->update([&identity] (const auto & loc) { + auto nh = testHead->update([&identity] (const auto & loc) { auto ret = loc->identity(*identity); if (identity->owner()) ret = ret.template shared<optional<Identity>>(identity->finalOwner()); return st.store(ret); }); if (nh) - *h = *nh; + *testHead = *nh; } } @@ -309,7 +309,7 @@ void startServer(const vector<string> &) config.service<SyncService>(); - server.emplace(*h, move(config)); + server.emplace(*testHead, move(config)); server->peerList().onUpdate([](size_t idx, const Peer * peer) { size_t i = 0; @@ -364,7 +364,7 @@ void sharedStateGet(const vector<string> &) { ostringstream ss; ss << "shared-state-get"; - for (const auto & r : h->behavior().lens<vector<Ref>>().get()) + for (const auto & r : testHead->behavior().lens<vector<Ref>>().get()) ss << " " << string(r.digest()); printLine(ss.str()); } @@ -379,7 +379,7 @@ void sharedStateWait(const vector<string> & args) }; auto watchedPtr = make_shared<SharedStateWait>(); - auto watched = h->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) { + auto watched = testHead->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) { vector<Stored<Object>> objs; objs.reserve(refs.size()); for (const auto & r : refs) @@ -413,7 +413,7 @@ void sharedStateWait(const vector<string> & args) void watchLocalIdentity(const vector<string> &) { - auto bhv = h->behavior().lens<optional<Identity>>(); + auto bhv = testHead->behavior().lens<optional<Identity>>(); static auto watchedLocalIdentity = bhv.watch([] (const optional<Identity> & idt) { if (idt) { ostringstream ss; @@ -431,7 +431,7 @@ void watchLocalIdentity(const vector<string> &) void watchSharedIdentity(const vector<string> &) { - auto bhv = h->behavior().lens<SharedState>().lens<optional<Identity>>(); + auto bhv = testHead->behavior().lens<SharedState>().lens<optional<Identity>>(); static auto watchedSharedIdentity = bhv.watch([] (const optional<Identity> & idt) { if (idt) { ostringstream ss; @@ -449,7 +449,7 @@ void updateLocalIdentity(const vector<string> & params) throw invalid_argument("usage: update-local-identity <name>"); } - auto nh = h->update([¶ms] (const Stored<LocalState> & loc) { + auto nh = testHead->update([¶ms] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto b = loc->identity()->modify(); @@ -457,7 +457,7 @@ void updateLocalIdentity(const vector<string> & params) return st.store(loc->identity(b.commit())); }); if (nh) - *h = *nh; + *testHead = *nh; } void updateSharedIdentity(const vector<string> & params) @@ -466,7 +466,7 @@ void updateSharedIdentity(const vector<string> & params) throw invalid_argument("usage: update-shared-identity <name>"); } - auto nh = h->update([¶ms] (const Stored<LocalState> & loc) { + auto nh = testHead->update([¶ms] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto mbid = loc->shared<optional<Identity>>(); if (!mbid) @@ -477,7 +477,7 @@ void updateSharedIdentity(const vector<string> & params) return st.store(loc->shared<optional<Identity>>(optional(b.commit()))); }); if (nh) - *h = *nh; + *testHead = *nh; } void attachTo(const vector<string> & params) @@ -515,7 +515,7 @@ void contactList(const vector<string> &) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { ostringstream ss; ss << "contact-list-item " << string(c.leastRoot()) << " " << c.name(); if (auto id = c.identity()) @@ -532,28 +532,30 @@ void contactSetName(const vector<string> & args) auto name = args.at(1); auto c = getContact(id); - auto nh = h->update([&] (const Stored<LocalState> & loc) { + auto nh = testHead->update([&] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto cc = c.customName(st, name); auto contacts = loc->shared<Set<Contact>>(); return st.store(loc->shared<Set<Contact>>(contacts.add(st, cc))); }); if (nh) - *h = *nh; + *testHead = *nh; printLine("contact-set-name-done"); } void dmSendPeer(const vector<string> & args) { - server->svc<DirectMessageService>().send( + DirectMessageService::send( + *testHead, getPeer(args.at(0)).peer, args.at(1)); } void dmSendContact(const vector<string> & args) { - server->svc<DirectMessageService>().send( + DirectMessageService::send( + *testHead, getContact(args.at(0)), args.at(1)); } @@ -562,7 +564,7 @@ template<class T> static void dmList(const T & peer) { if (auto id = peer.identity()) - for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) { + for (const auto & msg : testHead->behavior().get().shared<DirectMessageThreads>().thread(*id)) { string name = "<unnamed>"; if (const auto & from = msg.from()) if (const auto & opt = from->name()) @@ -622,7 +624,7 @@ vector<Command> commands = { int main(int argc, char * argv[]) { - h.emplace([] { + testHead.emplace([] { auto hs = st.heads<LocalState>(); if (!hs.empty()) return hs[0]; diff --git a/src/message.cpp b/src/message.cpp index 1ee362a..389807b 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -364,11 +364,11 @@ DirectMessageThread DirectMessageService::thread(const Identity & peer) return server.localState().get().shared<DirectMessageThreads>().thread(peer); } -DirectMessage DirectMessageService::send(const Identity & to, const string & text) +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Identity & to, const string & text) { Stored<DirectMessageData> msg; - server.localHead().update([&](const Stored<LocalState> & loc) { + head.update([&](const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto threads = loc->shared<DirectMessageThreads>(); @@ -388,14 +388,30 @@ DirectMessage DirectMessageService::send(const Identity & to, const string & tex return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); }); - if (auto peer = server.peer(to)) - peer->send(myUUID, msg.ref()); - return DirectMessage(new DirectMessage::Priv { .data = move(msg), }); } +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Contact & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, *id, text); + throw std::runtime_error("contact without erebos identity"); +} + +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Peer & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, id->finalOwner(), text); + throw std::runtime_error("peer without known identity"); +} + +DirectMessage DirectMessageService::send(const Identity & to, const string & text) +{ + return send(server.localHead(), to, text); +} + DirectMessage DirectMessageService::send(const Contact & to, const string & text) { if (auto id = to.identity()) @@ -448,8 +464,46 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads) auto dmt = threads.thread(peer); for (const auto & w : config.watchers) w(dmt, -1, -1); + + if (auto netPeer = server.peer(peer)) + syncWithPeer(server.localHead(), dmt, *netPeer); } prevState = move(state); } } + +void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer) +{ + for (const auto & msg : thread.p->head) + peer.send(myUUID, msg.ref()); + + head.update([&](const Stored<LocalState> & loc) { + auto st = head.storage(); + + auto threads = loc->shared<DirectMessageThreads>(); + + vector<Stored<DirectMessageData>> oldSent; + for (const auto & c : findThreadComponents(threads.data(), thread.peer(), &DirectMessageState::sent)) + for (const auto & m : c->sent) + oldSent.push_back(m); + filterAncestors(oldSent); + + auto newSent = oldSent; + for (const auto & msg : thread.p->head) + newSent.push_back(msg); + filterAncestors(newSent); + + if (newSent != oldSent) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = thread.peer(), + .sent = move(newSent), + }); + + return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); + } + + return loc; + }); +} diff --git a/test/message.test b/test/message.test index e317116..f630c59 100644 --- a/test/message.test +++ b/test/message.test @@ -88,11 +88,17 @@ test DirectMessage: # Reload message history - with p1: - send "stop-server" - expect /stop-server-done/ - send "start-server" + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + for p in [p1, p2]: + with p: + send "start-server" + with p1: send "contact-list" expect: /contact-list-item $c1_2 Owner2 Owner2/ @@ -111,3 +117,18 @@ test DirectMessage: expect /dm-list-item from Owner2 text hi$i/ expect /dm-list-(.*)/ capture done guard (done == "done") + + # Send message while offline + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + send "start-server" to p2 + + send "dm-send-contact $c1_2 while_offline" to p1 + send "start-server" to p1 + + expect /dm-received from Owner1 text while_offline/ from p2 |