diff options
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 85 |
1 files changed, 78 insertions, 7 deletions
diff --git a/src/network.cpp b/src/network.cpp index 9a9dc41..ce0dd30 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -1,6 +1,7 @@ #include "network.h" #include "identity.h" +#include "service.h" #include <algorithm> #include <cstring> @@ -18,8 +19,8 @@ using std::unique_lock; using namespace erebos; -Server::Server(const Identity & self): - p(new Priv(self)) +Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs): + p(new Priv(self, std::move(svcs))) { } @@ -68,6 +69,28 @@ void Peer::Priv::notifyWatchers() } } +bool Peer::hasChannel() const +{ + if (auto speer = p->speer.lock()) + return holds_alternative<Stored<Channel>>(speer->channel); + return false; +} + +bool Peer::send(UUID uuid, const Ref & ref) const +{ + if (hasChannel()) + if (auto speer = p->speer.lock()) { + TransportHeader header({ + { TransportHeader::Type::ServiceType, uuid }, + { TransportHeader::Type::ServiceRef, ref }, + }); + speer->send(header, { *ref }); + return true; + } + + return false; +} + PeerList::PeerList(): p(new Priv) {} PeerList::PeerList(const shared_ptr<PeerList::Priv> & p): p(p) {} @@ -106,8 +129,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w) } -Server::Priv::Priv(const Identity & self): - self(self) +Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs): + self(self), + services(std::move(svcs)) { struct ifaddrs * raddrs; if (getifaddrs(&raddrs) < 0) @@ -208,6 +232,7 @@ void Server::Priv::doListen() handlePacket(peer, *header, reply); peer.updateIdentity(reply); peer.updateChannel(reply); + peer.updateService(reply); if (!reply.header().empty()) peer.send(TransportHeader(reply.header()), reply.body()); @@ -276,6 +301,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + optional<UUID> serviceType; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -379,11 +406,31 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea break; case TransportHeader::Type::ServiceType: + if (!serviceType) + serviceType = std::get<UUID>(item.value); break; case TransportHeader::Type::ServiceRef: - break; + if (!serviceType) + for (auto & item : header.items) + if (item.type == TransportHeader::Type::ServiceType) { + serviceType = std::get<UUID>(item.value); + break; + } + if (!serviceType) + break; + if (auto pref = std::get<PartialRef>(item.value)) { + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.serviceQueue.emplace_back(*serviceType, wref); + wref->check(reply); + } } } } @@ -457,6 +504,30 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) } } +void Server::Peer::updateService(ReplyBuilder & reply) +{ + decltype(serviceQueue) next; + for (auto & x : serviceQueue) { + if (auto ref = std::get<1>(x)->check(reply)) { + if (lpeer) { + Service::Context ctx(new Service::Context::Priv { + .ref = *ref, + .peer = erebos::Peer(lpeer), + }); + + for (auto & svc : server.services) + if (svc->uuid() == std::get<UUID>(x)) { + svc->handle(ctx); + break; + } + } + } else { + next.push_back(std::move(x)); + } + } + serviceQueue = std::move(next); +} + void ReplyBuilder::header(TransportHeader::Item && item) { @@ -557,7 +628,7 @@ optional<TransportHeader> TransportHeader::load(const PartialObject & obj) .value = *ref, }); } else if (item.name == "STP") { - if (auto val = item.asText()) + if (auto val = item.asUUID()) items.emplace_back(Item { .type = Type::ServiceType, .value = *val, @@ -609,7 +680,7 @@ PartialObject TransportHeader::toObject() const break; case Type::ServiceType: - ritems.emplace_back("STP", std::get<string>(item.value)); + ritems.emplace_back("STP", std::get<UUID>(item.value)); break; case Type::ServiceRef: |