diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.cpp | 42 | ||||
-rw-r--r-- | src/message.cpp | 64 |
2 files changed, 81 insertions, 25 deletions
diff --git a/src/main.cpp b/src/main.cpp index 3100695..a0a9458 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -68,7 +68,7 @@ void printLine(const string & line) } Storage st(getErebosDir()); -optional<Head<LocalState>> h; +optional<Head<LocalState>> testHead; optional<Server> server; struct TestPeer @@ -109,7 +109,7 @@ Contact getContact(const string & id) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { if (string(c.leastRoot()) == id) { return c; } @@ -227,14 +227,14 @@ void createIdentity(const vector<string> & args) } if (identity) { - auto nh = h->update([&identity] (const auto & loc) { + auto nh = testHead->update([&identity] (const auto & loc) { auto ret = loc->identity(*identity); if (identity->owner()) ret = ret.template shared<optional<Identity>>(identity->finalOwner()); return st.store(ret); }); if (nh) - *h = *nh; + *testHead = *nh; } } @@ -309,7 +309,7 @@ void startServer(const vector<string> &) config.service<SyncService>(); - server.emplace(*h, move(config)); + server.emplace(*testHead, move(config)); server->peerList().onUpdate([](size_t idx, const Peer * peer) { size_t i = 0; @@ -364,7 +364,7 @@ void sharedStateGet(const vector<string> &) { ostringstream ss; ss << "shared-state-get"; - for (const auto & r : h->behavior().lens<vector<Ref>>().get()) + for (const auto & r : testHead->behavior().lens<vector<Ref>>().get()) ss << " " << string(r.digest()); printLine(ss.str()); } @@ -379,7 +379,7 @@ void sharedStateWait(const vector<string> & args) }; auto watchedPtr = make_shared<SharedStateWait>(); - auto watched = h->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) { + auto watched = testHead->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) { vector<Stored<Object>> objs; objs.reserve(refs.size()); for (const auto & r : refs) @@ -413,7 +413,7 @@ void sharedStateWait(const vector<string> & args) void watchLocalIdentity(const vector<string> &) { - auto bhv = h->behavior().lens<optional<Identity>>(); + auto bhv = testHead->behavior().lens<optional<Identity>>(); static auto watchedLocalIdentity = bhv.watch([] (const optional<Identity> & idt) { if (idt) { ostringstream ss; @@ -431,7 +431,7 @@ void watchLocalIdentity(const vector<string> &) void watchSharedIdentity(const vector<string> &) { - auto bhv = h->behavior().lens<SharedState>().lens<optional<Identity>>(); + auto bhv = testHead->behavior().lens<SharedState>().lens<optional<Identity>>(); static auto watchedSharedIdentity = bhv.watch([] (const optional<Identity> & idt) { if (idt) { ostringstream ss; @@ -449,7 +449,7 @@ void updateLocalIdentity(const vector<string> & params) throw invalid_argument("usage: update-local-identity <name>"); } - auto nh = h->update([¶ms] (const Stored<LocalState> & loc) { + auto nh = testHead->update([¶ms] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto b = loc->identity()->modify(); @@ -457,7 +457,7 @@ void updateLocalIdentity(const vector<string> & params) return st.store(loc->identity(b.commit())); }); if (nh) - *h = *nh; + *testHead = *nh; } void updateSharedIdentity(const vector<string> & params) @@ -466,7 +466,7 @@ void updateSharedIdentity(const vector<string> & params) throw invalid_argument("usage: update-shared-identity <name>"); } - auto nh = h->update([¶ms] (const Stored<LocalState> & loc) { + auto nh = testHead->update([¶ms] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto mbid = loc->shared<optional<Identity>>(); if (!mbid) @@ -477,7 +477,7 @@ void updateSharedIdentity(const vector<string> & params) return st.store(loc->shared<optional<Identity>>(optional(b.commit()))); }); if (nh) - *h = *nh; + *testHead = *nh; } void attachTo(const vector<string> & params) @@ -515,7 +515,7 @@ void contactList(const vector<string> &) auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; - for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { + for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) { ostringstream ss; ss << "contact-list-item " << string(c.leastRoot()) << " " << c.name(); if (auto id = c.identity()) @@ -532,28 +532,30 @@ void contactSetName(const vector<string> & args) auto name = args.at(1); auto c = getContact(id); - auto nh = h->update([&] (const Stored<LocalState> & loc) { + auto nh = testHead->update([&] (const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto cc = c.customName(st, name); auto contacts = loc->shared<Set<Contact>>(); return st.store(loc->shared<Set<Contact>>(contacts.add(st, cc))); }); if (nh) - *h = *nh; + *testHead = *nh; printLine("contact-set-name-done"); } void dmSendPeer(const vector<string> & args) { - server->svc<DirectMessageService>().send( + DirectMessageService::send( + *testHead, getPeer(args.at(0)).peer, args.at(1)); } void dmSendContact(const vector<string> & args) { - server->svc<DirectMessageService>().send( + DirectMessageService::send( + *testHead, getContact(args.at(0)), args.at(1)); } @@ -562,7 +564,7 @@ template<class T> static void dmList(const T & peer) { if (auto id = peer.identity()) - for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) { + for (const auto & msg : testHead->behavior().get().shared<DirectMessageThreads>().thread(*id)) { string name = "<unnamed>"; if (const auto & from = msg.from()) if (const auto & opt = from->name()) @@ -622,7 +624,7 @@ vector<Command> commands = { int main(int argc, char * argv[]) { - h.emplace([] { + testHead.emplace([] { auto hs = st.heads<LocalState>(); if (!hs.empty()) return hs[0]; diff --git a/src/message.cpp b/src/message.cpp index 1ee362a..389807b 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -364,11 +364,11 @@ DirectMessageThread DirectMessageService::thread(const Identity & peer) return server.localState().get().shared<DirectMessageThreads>().thread(peer); } -DirectMessage DirectMessageService::send(const Identity & to, const string & text) +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Identity & to, const string & text) { Stored<DirectMessageData> msg; - server.localHead().update([&](const Stored<LocalState> & loc) { + head.update([&](const Stored<LocalState> & loc) { auto st = loc.ref().storage(); auto threads = loc->shared<DirectMessageThreads>(); @@ -388,14 +388,30 @@ DirectMessage DirectMessageService::send(const Identity & to, const string & tex return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); }); - if (auto peer = server.peer(to)) - peer->send(myUUID, msg.ref()); - return DirectMessage(new DirectMessage::Priv { .data = move(msg), }); } +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Contact & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, *id, text); + throw std::runtime_error("contact without erebos identity"); +} + +DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Peer & to, const string & text) +{ + if (auto id = to.identity()) + return send(head, id->finalOwner(), text); + throw std::runtime_error("peer without known identity"); +} + +DirectMessage DirectMessageService::send(const Identity & to, const string & text) +{ + return send(server.localHead(), to, text); +} + DirectMessage DirectMessageService::send(const Contact & to, const string & text) { if (auto id = to.identity()) @@ -448,8 +464,46 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads) auto dmt = threads.thread(peer); for (const auto & w : config.watchers) w(dmt, -1, -1); + + if (auto netPeer = server.peer(peer)) + syncWithPeer(server.localHead(), dmt, *netPeer); } prevState = move(state); } } + +void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer) +{ + for (const auto & msg : thread.p->head) + peer.send(myUUID, msg.ref()); + + head.update([&](const Stored<LocalState> & loc) { + auto st = head.storage(); + + auto threads = loc->shared<DirectMessageThreads>(); + + vector<Stored<DirectMessageData>> oldSent; + for (const auto & c : findThreadComponents(threads.data(), thread.peer(), &DirectMessageState::sent)) + for (const auto & m : c->sent) + oldSent.push_back(m); + filterAncestors(oldSent); + + auto newSent = oldSent; + for (const auto & msg : thread.p->head) + newSent.push_back(msg); + filterAncestors(newSent); + + if (newSent != oldSent) { + auto state = st.store(DirectMessageState { + .prev = threads.data(), + .peer = thread.peer(), + .sent = move(newSent), + }); + + return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state))); + } + + return loc; + }); +} |