summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-01-07 20:38:51 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2024-01-08 22:15:21 +0100
commit56b6a56b5d14781cd24e38860c082bfdab96c918 (patch)
treefa8fc33960d19d8ef2bdce57db6f6697f70c8dc3
parent6e6836e7885259b731651ae172bd6313edae7cdd (diff)
Message: send messages as sync from shared state
-rw-r--r--include/erebos/message.h5
-rw-r--r--src/main.cpp42
-rw-r--r--src/message.cpp64
-rw-r--r--test/message.test29
4 files changed, 111 insertions, 29 deletions
diff --git a/include/erebos/message.h b/include/erebos/message.h
index 194044c..6ad6e54 100644
--- a/include/erebos/message.h
+++ b/include/erebos/message.h
@@ -131,12 +131,17 @@ public:
DirectMessageThread thread(const Identity &);
+ static DirectMessage send(const Head<LocalState> &, const Identity &, const std::string &);
+ static DirectMessage send(const Head<LocalState> &, const Contact &, const std::string &);
+ static DirectMessage send(const Head<LocalState> &, const Peer &, const std::string &);
+
DirectMessage send(const Identity &, const std::string &);
DirectMessage send(const Contact &, const std::string &);
DirectMessage send(const Peer &, const std::string &);
private:
void updateHandler(const DirectMessageThreads &);
+ static void syncWithPeer(const Head<LocalState> &, const DirectMessageThread &, const Peer &);
const Config config;
const Server & server;
diff --git a/src/main.cpp b/src/main.cpp
index 3100695..a0a9458 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -68,7 +68,7 @@ void printLine(const string & line)
}
Storage st(getErebosDir());
-optional<Head<LocalState>> h;
+optional<Head<LocalState>> testHead;
optional<Server> server;
struct TestPeer
@@ -109,7 +109,7 @@ Contact getContact(const string & id)
auto cmp = [](const Contact & x, const Contact & y) {
return x.data() < y.data();
};
- for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) {
+ for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) {
if (string(c.leastRoot()) == id) {
return c;
}
@@ -227,14 +227,14 @@ void createIdentity(const vector<string> & args)
}
if (identity) {
- auto nh = h->update([&identity] (const auto & loc) {
+ auto nh = testHead->update([&identity] (const auto & loc) {
auto ret = loc->identity(*identity);
if (identity->owner())
ret = ret.template shared<optional<Identity>>(identity->finalOwner());
return st.store(ret);
});
if (nh)
- *h = *nh;
+ *testHead = *nh;
}
}
@@ -309,7 +309,7 @@ void startServer(const vector<string> &)
config.service<SyncService>();
- server.emplace(*h, move(config));
+ server.emplace(*testHead, move(config));
server->peerList().onUpdate([](size_t idx, const Peer * peer) {
size_t i = 0;
@@ -364,7 +364,7 @@ void sharedStateGet(const vector<string> &)
{
ostringstream ss;
ss << "shared-state-get";
- for (const auto & r : h->behavior().lens<vector<Ref>>().get())
+ for (const auto & r : testHead->behavior().lens<vector<Ref>>().get())
ss << " " << string(r.digest());
printLine(ss.str());
}
@@ -379,7 +379,7 @@ void sharedStateWait(const vector<string> & args)
};
auto watchedPtr = make_shared<SharedStateWait>();
- auto watched = h->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) {
+ auto watched = testHead->behavior().lens<vector<Ref>>().watch([args, watchedPtr] (const vector<Ref> & refs) {
vector<Stored<Object>> objs;
objs.reserve(refs.size());
for (const auto & r : refs)
@@ -413,7 +413,7 @@ void sharedStateWait(const vector<string> & args)
void watchLocalIdentity(const vector<string> &)
{
- auto bhv = h->behavior().lens<optional<Identity>>();
+ auto bhv = testHead->behavior().lens<optional<Identity>>();
static auto watchedLocalIdentity = bhv.watch([] (const optional<Identity> & idt) {
if (idt) {
ostringstream ss;
@@ -431,7 +431,7 @@ void watchLocalIdentity(const vector<string> &)
void watchSharedIdentity(const vector<string> &)
{
- auto bhv = h->behavior().lens<SharedState>().lens<optional<Identity>>();
+ auto bhv = testHead->behavior().lens<SharedState>().lens<optional<Identity>>();
static auto watchedSharedIdentity = bhv.watch([] (const optional<Identity> & idt) {
if (idt) {
ostringstream ss;
@@ -449,7 +449,7 @@ void updateLocalIdentity(const vector<string> & params)
throw invalid_argument("usage: update-local-identity <name>");
}
- auto nh = h->update([&params] (const Stored<LocalState> & loc) {
+ auto nh = testHead->update([&params] (const Stored<LocalState> & loc) {
auto st = loc.ref().storage();
auto b = loc->identity()->modify();
@@ -457,7 +457,7 @@ void updateLocalIdentity(const vector<string> & params)
return st.store(loc->identity(b.commit()));
});
if (nh)
- *h = *nh;
+ *testHead = *nh;
}
void updateSharedIdentity(const vector<string> & params)
@@ -466,7 +466,7 @@ void updateSharedIdentity(const vector<string> & params)
throw invalid_argument("usage: update-shared-identity <name>");
}
- auto nh = h->update([&params] (const Stored<LocalState> & loc) {
+ auto nh = testHead->update([&params] (const Stored<LocalState> & loc) {
auto st = loc.ref().storage();
auto mbid = loc->shared<optional<Identity>>();
if (!mbid)
@@ -477,7 +477,7 @@ void updateSharedIdentity(const vector<string> & params)
return st.store(loc->shared<optional<Identity>>(optional(b.commit())));
});
if (nh)
- *h = *nh;
+ *testHead = *nh;
}
void attachTo(const vector<string> & params)
@@ -515,7 +515,7 @@ void contactList(const vector<string> &)
auto cmp = [](const Contact & x, const Contact & y) {
return x.data() < y.data();
};
- for (const auto & c : h->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) {
+ for (const auto & c : testHead->behavior().lens<SharedState>().lens<Set<Contact>>().get().view(cmp)) {
ostringstream ss;
ss << "contact-list-item " << string(c.leastRoot()) << " " << c.name();
if (auto id = c.identity())
@@ -532,28 +532,30 @@ void contactSetName(const vector<string> & args)
auto name = args.at(1);
auto c = getContact(id);
- auto nh = h->update([&] (const Stored<LocalState> & loc) {
+ auto nh = testHead->update([&] (const Stored<LocalState> & loc) {
auto st = loc.ref().storage();
auto cc = c.customName(st, name);
auto contacts = loc->shared<Set<Contact>>();
return st.store(loc->shared<Set<Contact>>(contacts.add(st, cc)));
});
if (nh)
- *h = *nh;
+ *testHead = *nh;
printLine("contact-set-name-done");
}
void dmSendPeer(const vector<string> & args)
{
- server->svc<DirectMessageService>().send(
+ DirectMessageService::send(
+ *testHead,
getPeer(args.at(0)).peer,
args.at(1));
}
void dmSendContact(const vector<string> & args)
{
- server->svc<DirectMessageService>().send(
+ DirectMessageService::send(
+ *testHead,
getContact(args.at(0)),
args.at(1));
}
@@ -562,7 +564,7 @@ template<class T>
static void dmList(const T & peer)
{
if (auto id = peer.identity())
- for (const auto & msg : h->behavior().get().shared<DirectMessageThreads>().thread(*id)) {
+ for (const auto & msg : testHead->behavior().get().shared<DirectMessageThreads>().thread(*id)) {
string name = "<unnamed>";
if (const auto & from = msg.from())
if (const auto & opt = from->name())
@@ -622,7 +624,7 @@ vector<Command> commands = {
int main(int argc, char * argv[])
{
- h.emplace([] {
+ testHead.emplace([] {
auto hs = st.heads<LocalState>();
if (!hs.empty())
return hs[0];
diff --git a/src/message.cpp b/src/message.cpp
index 1ee362a..389807b 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -364,11 +364,11 @@ DirectMessageThread DirectMessageService::thread(const Identity & peer)
return server.localState().get().shared<DirectMessageThreads>().thread(peer);
}
-DirectMessage DirectMessageService::send(const Identity & to, const string & text)
+DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Identity & to, const string & text)
{
Stored<DirectMessageData> msg;
- server.localHead().update([&](const Stored<LocalState> & loc) {
+ head.update([&](const Stored<LocalState> & loc) {
auto st = loc.ref().storage();
auto threads = loc->shared<DirectMessageThreads>();
@@ -388,14 +388,30 @@ DirectMessage DirectMessageService::send(const Identity & to, const string & tex
return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
});
- if (auto peer = server.peer(to))
- peer->send(myUUID, msg.ref());
-
return DirectMessage(new DirectMessage::Priv {
.data = move(msg),
});
}
+DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Contact & to, const string & text)
+{
+ if (auto id = to.identity())
+ return send(head, *id, text);
+ throw std::runtime_error("contact without erebos identity");
+}
+
+DirectMessage DirectMessageService::send(const Head<LocalState> & head, const Peer & to, const string & text)
+{
+ if (auto id = to.identity())
+ return send(head, id->finalOwner(), text);
+ throw std::runtime_error("peer without known identity");
+}
+
+DirectMessage DirectMessageService::send(const Identity & to, const string & text)
+{
+ return send(server.localHead(), to, text);
+}
+
DirectMessage DirectMessageService::send(const Contact & to, const string & text)
{
if (auto id = to.identity())
@@ -448,8 +464,46 @@ void DirectMessageService::updateHandler(const DirectMessageThreads & threads)
auto dmt = threads.thread(peer);
for (const auto & w : config.watchers)
w(dmt, -1, -1);
+
+ if (auto netPeer = server.peer(peer))
+ syncWithPeer(server.localHead(), dmt, *netPeer);
}
prevState = move(state);
}
}
+
+void DirectMessageService::syncWithPeer(const Head<LocalState> & head, const DirectMessageThread & thread, const Peer & peer)
+{
+ for (const auto & msg : thread.p->head)
+ peer.send(myUUID, msg.ref());
+
+ head.update([&](const Stored<LocalState> & loc) {
+ auto st = head.storage();
+
+ auto threads = loc->shared<DirectMessageThreads>();
+
+ vector<Stored<DirectMessageData>> oldSent;
+ for (const auto & c : findThreadComponents(threads.data(), thread.peer(), &DirectMessageState::sent))
+ for (const auto & m : c->sent)
+ oldSent.push_back(m);
+ filterAncestors(oldSent);
+
+ auto newSent = oldSent;
+ for (const auto & msg : thread.p->head)
+ newSent.push_back(msg);
+ filterAncestors(newSent);
+
+ if (newSent != oldSent) {
+ auto state = st.store(DirectMessageState {
+ .prev = threads.data(),
+ .peer = thread.peer(),
+ .sent = move(newSent),
+ });
+
+ return st.store(loc->shared<DirectMessageThreads>(DirectMessageThreads(state)));
+ }
+
+ return loc;
+ });
+}
diff --git a/test/message.test b/test/message.test
index e317116..f630c59 100644
--- a/test/message.test
+++ b/test/message.test
@@ -88,11 +88,17 @@ test DirectMessage:
# Reload message history
- with p1:
- send "stop-server"
- expect /stop-server-done/
- send "start-server"
+ for p in [p1, p2]:
+ with p:
+ send "stop-server"
+ for p in [p1, p2]:
+ with p:
+ expect /stop-server-done/
+ for p in [p1, p2]:
+ with p:
+ send "start-server"
+ with p1:
send "contact-list"
expect:
/contact-list-item $c1_2 Owner2 Owner2/
@@ -111,3 +117,18 @@ test DirectMessage:
expect /dm-list-item from Owner2 text hi$i/
expect /dm-list-(.*)/ capture done
guard (done == "done")
+
+ # Send message while offline
+
+ for p in [p1, p2]:
+ with p:
+ send "stop-server"
+ for p in [p1, p2]:
+ with p:
+ expect /stop-server-done/
+ send "start-server" to p2
+
+ send "dm-send-contact $c1_2 while_offline" to p1
+ send "start-server" to p1
+
+ expect /dm-received from Owner1 text while_offline/ from p2