#include #include #include #include #include #include #include #include "storage.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using std::cerr; using std::cout; using std::endl; using std::function; using std::future; using std::invalid_argument; using std::make_unique; using std::map; using std::mutex; using std::optional; using std::ostringstream; using std::promise; using std::scoped_lock; using std::string; using std::thread; using std::to_string; using std::unique_ptr; using std::vector; namespace fs = std::filesystem; using namespace erebos; namespace { fs::path getErebosDir() { const char * value = getenv("EREBOS_DIR"); if (value) return value; return "./.erebos"; } mutex outputMutex; void printLine(const string & line) { scoped_lock lock(outputMutex); cout << line << std::endl; } Storage st(getErebosDir()); optional> h; optional server; struct TestPeer { Peer peer; size_t id; bool deleted = false; promise pairingAnswer {}; }; vector testPeers; TestPeer & getPeer(const string & name) { try { return testPeers.at(std::stoi(name) - 1); } catch (const std::invalid_argument &) {} for (auto & p : testPeers) if (p.peer.name() == name) return p; ostringstream ss; ss << "peer '" << name << "' not found"; throw invalid_argument(ss.str().c_str()); } TestPeer & getPeer(const Peer & peer) { for (auto & p : testPeers) if (p.peer == peer) return p; throw invalid_argument("peer not found"); } struct Command { string name; function &)> action; }; void store(const vector & args) { auto type = args.at(0); vector inner, data; char * line = nullptr; size_t size = 0; while (getline(&line, &size, stdin) > 0 && strlen(line) > 1) copy(line, line + strlen(line), std::back_inserter(inner)); free(line); auto inserter = std::back_inserter(data); copy(type.begin(), type.end(), inserter); inserter = ' '; auto slen = to_string(inner.size()); copy(slen.begin(), slen.end(), inserter); inserter = '\n'; copy(inner.begin(), inner.end(), inserter); auto digest = st.priv().storeBytes(data); ostringstream ss; ss << "store-done " << string(digest); printLine(ss.str()); } void storedGeneration(const vector & args) { auto ref = st.ref(Digest(args.at(0))); if (!ref) throw invalid_argument("ref " + args.at(0) + " not found"); ostringstream ss; ss << "stored-generation " << string(ref->digest()) << " " << string(ref->generation()); printLine(ss.str()); } void storedRoots(const vector & args) { auto ref = st.ref(Digest(args.at(0))); if (!ref) throw invalid_argument("ref " + args.at(0) + " not found"); ostringstream ss; ss << "stored-roots " << string(ref->digest()); for (const auto & dgst : ref->roots()) ss << " " << string(dgst); printLine(ss.str()); } void storedSetAdd(const vector & args) { auto iref = st.ref(Digest(args.at(0))); if (!iref) throw invalid_argument("ref " + args.at(0) + " not found"); auto set = args.size() > 1 ? Set>>::load({ *st.ref(Digest(args.at(1))) }) : Set>>(); ostringstream ss; ss << "stored-set-add"; for (const auto & d : set.add(st, { Stored::load(*iref) }).digests()) ss << " " << string(d); printLine(ss.str()); } void storedSetList(const vector & args) { auto ref = st.ref(Digest(args.at(0))); if (!ref) throw invalid_argument("ref " + args.at(0) + " not found"); for (const auto & vec : Set>>::load({ *ref }).view(std::less{})) { ostringstream ss; ss << "stored-set-item"; for (const auto & x : vec) ss << " " << string(x.ref().digest()); printLine(ss.str()); } printLine("stored-set-done"); } void createIdentity(const vector & args) { optional identity; for (ssize_t i = args.size() - 1; i >= 0; i--) { const auto & name = args[i]; auto builder = Identity::create(st); builder.name(name); if (identity) builder.owner(*identity); identity = builder.commit(); } if (identity) { auto nh = h->update([&identity] (const auto & loc) { auto ret = loc->identity(*identity); if (identity->owner()) ret = ret.template shared>(identity->finalOwner()); return st.store(ret); }); if (nh) *h = *nh; } } void printPairingResult(string prefix, Peer peer, future && future) { auto outcome = future.get(); ostringstream ss; ss << prefix << (outcome == PairingServiceBase::Outcome::Success ? "-done " : "-failed ") << getPeer(peer).id; switch (outcome) { case PairingServiceBase::Outcome::Success: break; case PairingServiceBase::Outcome::PeerRejected: ss << " rejected"; break; case PairingServiceBase::Outcome::UserRejected: ss << " user"; break; case PairingServiceBase::Outcome::UnexpectedMessage: ss << " unexpected"; break; case PairingServiceBase::Outcome::NonceMismatch: ss << " nonce"; break; case PairingServiceBase::Outcome::Stale: ss << " stale"; break; } printLine(ss.str()); } future confirmPairing(string prefix, const Peer & peer, string confirm, future && outcome) { thread(printPairingResult, prefix, peer, move(outcome)).detach(); promise promise; auto input = promise.get_future(); getPeer(peer).pairingAnswer = move(promise); ostringstream ss; ss << prefix << " " << getPeer(peer).id << " " << confirm; printLine(ss.str()); return input; } void startServer(const vector &) { vector> services; using namespace std::placeholders; auto atts = make_unique(); atts->onRequest(bind(confirmPairing, "attach-request", _1, _2, _3)); atts->onResponse(bind(confirmPairing, "attach-response", _1, _2, _3)); services.push_back(move(atts)); auto conts = make_unique(); conts->onRequest(bind(confirmPairing, "contact-request", _1, _2, _3)); conts->onResponse(bind(confirmPairing, "contact-response", _1, _2, _3)); services.push_back(move(conts)); services.push_back(make_unique()); server.emplace(*h, move(services)); server->peerList().onUpdate([](size_t idx, const Peer * peer) { size_t i = 0; while (idx > 0 && i < testPeers.size()) { if (!testPeers[i].deleted) idx--; i++; } string prefix = "peer " + to_string(i + 1); if (peer) { if (i >= testPeers.size()) { testPeers.push_back(TestPeer { .peer = *peer, .id = i + 1 }); const auto & paddr = peer->address(); ostringstream ss; ss << prefix << " addr " << inet_ntoa(paddr.sin_addr) << " " << ntohs(paddr.sin_port); printLine(ss.str()); } if (peer->identity()) { ostringstream ss; ss << prefix << " id"; for (auto idt = peer->identity(); idt; idt = idt->owner()) ss << " " << (idt->name() ? *idt->name() : ""); printLine(ss.str()); } } else { testPeers[i].deleted = true; printLine(prefix + " deleted"); } }); } void stopServer(const vector &) { server.reset(); } void sharedStateGet(const vector &) { ostringstream ss; ss << "shared-state-get"; for (const auto & r : h->behavior().lens>().get()) ss << " " << string(r.digest()); printLine(ss.str()); } void sharedStateWait(const vector & args) { static optional>> watched; watched = h->behavior().lens>().watch([args, watched = watched] (const vector & refs) mutable { vector> objs; objs.reserve(refs.size()); for (const auto & r : refs) objs.push_back(Stored::load(r)); auto objs2 = objs; for (const auto & a : args) if (auto ref = st.ref(Digest(a))) objs2.push_back(Stored::load(*ref)); else return; filterAncestors(objs2); if (objs2 == objs) { ostringstream ss; ss << "shared-state-wait"; for (const auto & a : args) ss << " " << a; printLine(ss.str()); watched = std::nullopt; } }); } void watchLocalIdentity(const vector &) { auto bhv = h->behavior().lens>(); static auto watchedLocalIdentity = bhv.watch([] (const optional & idt) { if (idt) { ostringstream ss; ss << "local-identity"; for (optional i = idt; i; i = i->owner()) ss << " " << i->name().value(); printLine(ss.str()); } }); } void watchSharedIdentity(const vector &) { auto bhv = h->behavior().lens().lens>(); static auto watchedSharedIdentity = bhv.watch([] (const optional & idt) { if (idt) { ostringstream ss; ss << "shared-identity"; for (optional i = idt; i; i = i->owner()) ss << " " << i->name().value(); printLine(ss.str()); } }); } void updateLocalIdentity(const vector & params) { if (params.size() != 1) { throw invalid_argument("usage: update-local-identity "); } auto nh = h->update([¶ms] (const Stored & loc) { auto st = loc.ref().storage(); auto b = loc->identity()->modify(); b.name(params[0]); return st.store(loc->identity(b.commit())); }); if (nh) *h = *nh; } void updateSharedIdentity(const vector & params) { if (params.size() != 1) { throw invalid_argument("usage: update-shared-identity "); } auto nh = h->update([¶ms] (const Stored & loc) { auto st = loc.ref().storage(); auto mbid = loc->shared>(); if (!mbid) return loc; auto b = mbid->modify(); b.name(params[0]); return st.store(loc->shared>(optional(b.commit()))); }); if (nh) *h = *nh; } void attachTo(const vector & params) { server->svc().attachTo(getPeer(params.at(0)).peer); } void attachAccept(const vector & params) { getPeer(params.at(0)).pairingAnswer.set_value(true); } void attachReject(const vector & params) { getPeer(params.at(0)).pairingAnswer.set_value(false); } void contactRequest(const vector & params) { server->svc().request(getPeer(params.at(0)).peer); } void contactAccept(const vector & params) { getPeer(params.at(0)).pairingAnswer.set_value(true); } void contactReject(const vector & params) { getPeer(params.at(0)).pairingAnswer.set_value(false); } void contactList(const vector &) { auto cmp = [](const Contact & x, const Contact & y) { return x.data() < y.data(); }; for (const auto & c : h->behavior().lens().lens>().get().view(cmp)) { ostringstream ss; ss << "contact-list-item " << c.name(); if (auto id = c.identity()) if (auto iname = id->name()) ss << " " << *iname; printLine(ss.str()); } printLine("contact-list-done"); } vector commands = { { "store", store }, { "stored-generation", storedGeneration }, { "stored-roots", storedRoots }, { "stored-set-add", storedSetAdd }, { "stored-set-list", storedSetList }, { "create-identity", createIdentity }, { "start-server", startServer }, { "stop-server", stopServer }, { "shared-state-get", sharedStateGet }, { "shared-state-wait", sharedStateWait }, { "watch-local-identity", watchLocalIdentity }, { "watch-shared-identity", watchSharedIdentity }, { "update-local-identity", updateLocalIdentity }, { "update-shared-identity", updateSharedIdentity }, { "attach-to", attachTo }, { "attach-accept", attachAccept }, { "attach-reject", attachReject }, { "contact-request", contactRequest }, { "contact-accept", contactAccept }, { "contact-reject", contactReject }, { "contact-list", contactList }, }; } int main(int argc, char * argv[]) { h.emplace([] { auto hs = st.heads(); if (!hs.empty()) return hs[0]; else return st.storeHead(LocalState()); }()); char * line = nullptr; size_t size = 0; if (argc > 1) { vector args; for (int i = 2; i < argc; i++) args.emplace_back(argv[i]); for (const auto & cmd : commands) { if (cmd.name == argv[1]) { cmd.action(args); return 0; } } cerr << "Unknown command: '" << argv[1] << "'" << endl; return 1; } while (getline(&line, &size, stdin) > 0) { optional command; vector args; const char * last = line; for (const char * cur = line;; cur++) { if (isspace(*cur) || *cur == '\0') { if (last < cur) { if (!command) command.emplace(last, cur); else args.emplace_back(last, cur); } last = cur + 1; if (*cur == '\0') break; } } if (!command) continue; bool found = false; for (const auto & cmd : commands) { if (cmd.name == *command) { found = true; cmd.action(args); break; } } if (!found) cerr << "Unknown command: '" << *command << "'" << endl; } free(line); server.reset(); return 0; }