diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2020-02-29 20:38:38 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2020-03-10 21:47:29 +0100 |
commit | 240edf7494745dc4df2128644fe5c1a73ec2d513 (patch) | |
tree | a952721545715e4d131c1fcc29c0f872d6a3bfbe /src | |
parent | 76d6f638df485d179899fa740b9bb53ee55ba7bc (diff) |
Network: service interface and handling
Diffstat (limited to 'src')
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/network.cpp | 85 | ||||
-rw-r--r-- | src/network.h | 11 | ||||
-rw-r--r-- | src/service.cpp | 25 | ||||
-rw-r--r-- | src/service.h | 14 |
5 files changed, 127 insertions, 9 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a50bf66..2bbc7d0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,5 +7,6 @@ add_library(erebos identity network pubkey + service storage ) diff --git a/src/network.cpp b/src/network.cpp index 9a9dc41..ce0dd30 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -1,6 +1,7 @@ #include "network.h" #include "identity.h" +#include "service.h" #include <algorithm> #include <cstring> @@ -18,8 +19,8 @@ using std::unique_lock; using namespace erebos; -Server::Server(const Identity & self): - p(new Priv(self)) +Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs): + p(new Priv(self, std::move(svcs))) { } @@ -68,6 +69,28 @@ void Peer::Priv::notifyWatchers() } } +bool Peer::hasChannel() const +{ + if (auto speer = p->speer.lock()) + return holds_alternative<Stored<Channel>>(speer->channel); + return false; +} + +bool Peer::send(UUID uuid, const Ref & ref) const +{ + if (hasChannel()) + if (auto speer = p->speer.lock()) { + TransportHeader header({ + { TransportHeader::Type::ServiceType, uuid }, + { TransportHeader::Type::ServiceRef, ref }, + }); + speer->send(header, { *ref }); + return true; + } + + return false; +} + PeerList::PeerList(): p(new Priv) {} PeerList::PeerList(const shared_ptr<PeerList::Priv> & p): p(p) {} @@ -106,8 +129,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w) } -Server::Priv::Priv(const Identity & self): - self(self) +Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs): + self(self), + services(std::move(svcs)) { struct ifaddrs * raddrs; if (getifaddrs(&raddrs) < 0) @@ -208,6 +232,7 @@ void Server::Priv::doListen() handlePacket(peer, *header, reply); peer.updateIdentity(reply); peer.updateChannel(reply); + peer.updateService(reply); if (!reply.header().empty()) peer.send(TransportHeader(reply.header()), reply.body()); @@ -276,6 +301,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + optional<UUID> serviceType; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -379,11 +406,31 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; case TransportHeader::Type::ServiceType: + if (!serviceType) + serviceType = std::get<UUID>(item.value); break; case TransportHeader::Type::ServiceRef: - break; + if (!serviceType) + for (auto & item : header.items) + if (item.type == TransportHeader::Type::ServiceType) { + serviceType = std::get<UUID>(item.value); + break; + } + if (!serviceType) + break; + if (auto pref = std::get<PartialRef>(item.value)) { + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.serviceQueue.emplace_back(*serviceType, wref); + wref->check(reply); + } } } } @@ -457,6 +504,30 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } } +void Server::Peer::updateService(ReplyBuilder & reply) +{ + decltype(serviceQueue) next; + for (auto & x : serviceQueue) { + if (auto ref = std::get<1>(x)->check(reply)) { + if (lpeer) { + Service::Context ctx(new Service::Context::Priv { + .ref = *ref, + .peer = erebos::Peer(lpeer), + }); + + for (auto & svc : server.services) + if (svc->uuid() == std::get<UUID>(x)) { + svc->handle(ctx); + break; + } + } + } else { + next.push_back(std::move(x)); + } + } + serviceQueue = std::move(next); +} + void ReplyBuilder::header(TransportHeader::Item && item) { @@ -557,7 +628,7 @@ optional<TransportHeader> TransportHeader::load(const PartialObject & obj) .value = *ref, }); } else if (item.name == "STP") { - if (auto val = item.asText()) + if (auto val = item.asUUID()) items.emplace_back(Item { .type = Type::ServiceType, .value = *val, @@ -609,7 +680,7 @@ PartialObject TransportHeader::toObject() const break; case Type::ServiceType: - ritems.emplace_back("STP", std::get<string>(item.value)); + ritems.emplace_back("STP", std::get<UUID>(item.value)); break; case Type::ServiceRef: diff --git a/src/network.h b/src/network.h index 13bb031..9b146a9 100644 --- a/src/network.h +++ b/src/network.h @@ -21,6 +21,7 @@ using std::thread; using std::unique_ptr; using std::variant; using std::vector; +using std::tuple; using std::weak_ptr; using std::enable_shared_from_this; @@ -31,6 +32,7 @@ using chrono::steady_clock; namespace erebos { class ReplyBuilder; +struct WaitingRef; struct Server::Peer { @@ -53,11 +55,14 @@ struct Server::Peer Storage tempStorage; PartialStorage partStorage; + vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {}; + shared_ptr<erebos::Peer::Priv> lpeer = nullptr; void send(const struct TransportHeader &, const vector<Object> &) const; void updateIdentity(ReplyBuilder &); void updateChannel(ReplyBuilder &); + void updateService(ReplyBuilder &); }; struct Peer::Priv : enable_shared_from_this<Peer::Priv> @@ -94,7 +99,7 @@ struct TransportHeader struct Item { const Type type; - const variant<PartialRef, string> value; + const variant<PartialRef, UUID> value; }; TransportHeader(const vector<Item> & items): items(items) {} @@ -131,7 +136,7 @@ struct WaitingRef struct Server::Priv { - Priv(const Identity & self); + Priv(const Identity & self, vector<unique_ptr<Service>> && svcs); ~Priv(); void doListen(); void doAnnounce(); @@ -147,6 +152,8 @@ struct Server::Priv bool finish = false; Identity self; + vector<unique_ptr<Service>> services; + thread threadListen; thread threadAnnounce; diff --git a/src/service.cpp b/src/service.cpp new file mode 100644 index 0000000..f8217c8 --- /dev/null +++ b/src/service.cpp @@ -0,0 +1,25 @@ +#include "service.h" + +using namespace erebos; + +Service::Service() = default; +Service::~Service() = default; + +Service::Context::Context(Priv * p): + p(p) +{} + +Service::Context::Priv & Service::Context::priv() +{ + return *p; +} + +const Ref & Service::Context::ref() const +{ + return p->ref; +} + +const Peer & Service::Context::peer() const +{ + return p->peer; +} diff --git a/src/service.h b/src/service.h new file mode 100644 index 0000000..9980471 --- /dev/null +++ b/src/service.h @@ -0,0 +1,14 @@ +#pragma once + +#include <erebos/network.h> +#include <erebos/service.h> + +namespace erebos { + +struct Service::Context::Priv +{ + Ref ref; + Peer peer; +}; + +} |