From 240edf7494745dc4df2128644fe5c1a73ec2d513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 29 Feb 2020 20:38:38 +0100 Subject: Network: service interface and handling --- include/erebos/network.h | 6 +++- include/erebos/service.h | 31 ++++++++++++++++++ src/CMakeLists.txt | 1 + src/network.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++++---- src/network.h | 11 +++++-- src/service.cpp | 25 ++++++++++++++ src/service.h | 14 ++++++++ 7 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 include/erebos/service.h create mode 100644 src/service.cpp create mode 100644 src/service.h diff --git a/include/erebos/network.h b/include/erebos/network.h index 90c85a6..d730fb5 100644 --- a/include/erebos/network.h +++ b/include/erebos/network.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -9,7 +10,7 @@ namespace erebos { class Server { public: - Server(const Identity &); + Server(const Identity &, std::vector> &&); ~Server(); class PeerList & peerList() const; @@ -30,6 +31,9 @@ public: std::string name() const; std::optional identity() const; + bool hasChannel() const; + bool send(UUID, const Ref &) const; + private: std::shared_ptr p; }; diff --git a/include/erebos/service.h b/include/erebos/service.h new file mode 100644 index 0000000..7a6f646 --- /dev/null +++ b/include/erebos/service.h @@ -0,0 +1,31 @@ +#pragma once + +#include + +namespace erebos { + +class Service +{ +public: + Service(); + virtual ~Service(); + + class Context + { + public: + struct Priv; + Context(Priv *); + Priv & priv(); + + const Ref & ref() const; + const class Peer & peer() const; + + private: + std::unique_ptr p; + }; + + virtual UUID uuid() const = 0; + virtual void handle(Context &) const = 0; +}; + +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a50bf66..2bbc7d0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,5 +7,6 @@ add_library(erebos identity network pubkey + service storage ) diff --git a/src/network.cpp b/src/network.cpp index 9a9dc41..ce0dd30 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -1,6 +1,7 @@ #include "network.h" #include "identity.h" +#include "service.h" #include #include @@ -18,8 +19,8 @@ using std::unique_lock; using namespace erebos; -Server::Server(const Identity & self): - p(new Priv(self)) +Server::Server(const Identity & self, vector> && svcs): + p(new Priv(self, std::move(svcs))) { } @@ -68,6 +69,28 @@ void Peer::Priv::notifyWatchers() } } +bool Peer::hasChannel() const +{ + if (auto speer = p->speer.lock()) + return holds_alternative>(speer->channel); + return false; +} + +bool Peer::send(UUID uuid, const Ref & ref) const +{ + if (hasChannel()) + if (auto speer = p->speer.lock()) { + TransportHeader header({ + { TransportHeader::Type::ServiceType, uuid }, + { TransportHeader::Type::ServiceRef, ref }, + }); + speer->send(header, { *ref }); + return true; + } + + return false; +} + PeerList::PeerList(): p(new Priv) {} PeerList::PeerList(const shared_ptr & p): p(p) {} @@ -106,8 +129,9 @@ void PeerList::onUpdate(function w) } -Server::Priv::Priv(const Identity & self): - self(self) +Server::Priv::Priv(const Identity & self, vector> && svcs): + self(self), + services(std::move(svcs)) { struct ifaddrs * raddrs; if (getifaddrs(&raddrs) < 0) @@ -208,6 +232,7 @@ void Server::Priv::doListen() handlePacket(peer, *header, reply); peer.updateIdentity(reply); peer.updateChannel(reply); + peer.updateService(reply); if (!reply.header().empty()) peer.send(TransportHeader(reply.header()), reply.body()); @@ -276,6 +301,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea for (const auto & obj : collectStoredObjects(*Stored::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + optional serviceType; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -379,11 +406,31 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; case TransportHeader::Type::ServiceType: + if (!serviceType) + serviceType = std::get(item.value); break; case TransportHeader::Type::ServiceRef: - break; + if (!serviceType) + for (auto & item : header.items) + if (item.type == TransportHeader::Type::ServiceType) { + serviceType = std::get(item.value); + break; + } + if (!serviceType) + break; + if (auto pref = std::get(item.value)) { + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.serviceQueue.emplace_back(*serviceType, wref); + wref->check(reply); + } } } } @@ -457,6 +504,30 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } } +void Server::Peer::updateService(ReplyBuilder & reply) +{ + decltype(serviceQueue) next; + for (auto & x : serviceQueue) { + if (auto ref = std::get<1>(x)->check(reply)) { + if (lpeer) { + Service::Context ctx(new Service::Context::Priv { + .ref = *ref, + .peer = erebos::Peer(lpeer), + }); + + for (auto & svc : server.services) + if (svc->uuid() == std::get(x)) { + svc->handle(ctx); + break; + } + } + } else { + next.push_back(std::move(x)); + } + } + serviceQueue = std::move(next); +} + void ReplyBuilder::header(TransportHeader::Item && item) { @@ -557,7 +628,7 @@ optional TransportHeader::load(const PartialObject & obj) .value = *ref, }); } else if (item.name == "STP") { - if (auto val = item.asText()) + if (auto val = item.asUUID()) items.emplace_back(Item { .type = Type::ServiceType, .value = *val, @@ -609,7 +680,7 @@ PartialObject TransportHeader::toObject() const break; case Type::ServiceType: - ritems.emplace_back("STP", std::get(item.value)); + ritems.emplace_back("STP", std::get(item.value)); break; case Type::ServiceRef: diff --git a/src/network.h b/src/network.h index 13bb031..9b146a9 100644 --- a/src/network.h +++ b/src/network.h @@ -21,6 +21,7 @@ using std::thread; using std::unique_ptr; using std::variant; using std::vector; +using std::tuple; using std::weak_ptr; using std::enable_shared_from_this; @@ -31,6 +32,7 @@ using chrono::steady_clock; namespace erebos { class ReplyBuilder; +struct WaitingRef; struct Server::Peer { @@ -53,11 +55,14 @@ struct Server::Peer Storage tempStorage; PartialStorage partStorage; + vector>> serviceQueue {}; + shared_ptr lpeer = nullptr; void send(const struct TransportHeader &, const vector &) const; void updateIdentity(ReplyBuilder &); void updateChannel(ReplyBuilder &); + void updateService(ReplyBuilder &); }; struct Peer::Priv : enable_shared_from_this @@ -94,7 +99,7 @@ struct TransportHeader struct Item { const Type type; - const variant value; + const variant value; }; TransportHeader(const vector & items): items(items) {} @@ -131,7 +136,7 @@ struct WaitingRef struct Server::Priv { - Priv(const Identity & self); + Priv(const Identity & self, vector> && svcs); ~Priv(); void doListen(); void doAnnounce(); @@ -147,6 +152,8 @@ struct Server::Priv bool finish = false; Identity self; + vector> services; + thread threadListen; thread threadAnnounce; diff --git a/src/service.cpp b/src/service.cpp new file mode 100644 index 0000000..f8217c8 --- /dev/null +++ b/src/service.cpp @@ -0,0 +1,25 @@ +#include "service.h" + +using namespace erebos; + +Service::Service() = default; +Service::~Service() = default; + +Service::Context::Context(Priv * p): + p(p) +{} + +Service::Context::Priv & Service::Context::priv() +{ + return *p; +} + +const Ref & Service::Context::ref() const +{ + return p->ref; +} + +const Peer & Service::Context::peer() const +{ + return p->peer; +} diff --git a/src/service.h b/src/service.h new file mode 100644 index 0000000..9980471 --- /dev/null +++ b/src/service.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace erebos { + +struct Service::Context::Priv +{ + Ref ref; + Peer peer; +}; + +} -- cgit v1.2.3