summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp85
1 files changed, 78 insertions, 7 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 9a9dc41..ce0dd30 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -1,6 +1,7 @@
#include "network.h"
#include "identity.h"
+#include "service.h"
#include <algorithm>
#include <cstring>
@@ -18,8 +19,8 @@ using std::unique_lock;
using namespace erebos;
-Server::Server(const Identity & self):
- p(new Priv(self))
+Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs):
+ p(new Priv(self, std::move(svcs)))
{
}
@@ -68,6 +69,28 @@ void Peer::Priv::notifyWatchers()
}
}
+bool Peer::hasChannel() const
+{
+ if (auto speer = p->speer.lock())
+ return holds_alternative<Stored<Channel>>(speer->channel);
+ return false;
+}
+
+bool Peer::send(UUID uuid, const Ref & ref) const
+{
+ if (hasChannel())
+ if (auto speer = p->speer.lock()) {
+ TransportHeader header({
+ { TransportHeader::Type::ServiceType, uuid },
+ { TransportHeader::Type::ServiceRef, ref },
+ });
+ speer->send(header, { *ref });
+ return true;
+ }
+
+ return false;
+}
+
PeerList::PeerList(): p(new Priv) {}
PeerList::PeerList(const shared_ptr<PeerList::Priv> & p): p(p) {}
@@ -106,8 +129,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
}
-Server::Priv::Priv(const Identity & self):
- self(self)
+Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs):
+ self(self),
+ services(std::move(svcs))
{
struct ifaddrs * raddrs;
if (getifaddrs(&raddrs) < 0)
@@ -208,6 +232,7 @@ void Server::Priv::doListen()
handlePacket(peer, *header, reply);
peer.updateIdentity(reply);
peer.updateChannel(reply);
+ peer.updateService(reply);
if (!reply.header().empty())
peer.send(TransportHeader(reply.header()), reply.body());
@@ -276,6 +301,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
plaintextRefs.insert(obj.ref.digest());
+ optional<UUID> serviceType;
+
for (auto & item : header.items) {
switch (item.type) {
case TransportHeader::Type::Acknowledged:
@@ -379,11 +406,31 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
case TransportHeader::Type::ServiceType:
+ if (!serviceType)
+ serviceType = std::get<UUID>(item.value);
break;
case TransportHeader::Type::ServiceRef:
- break;
+ if (!serviceType)
+ for (auto & item : header.items)
+ if (item.type == TransportHeader::Type::ServiceType) {
+ serviceType = std::get<UUID>(item.value);
+ break;
+ }
+ if (!serviceType)
+ break;
+ if (auto pref = std::get<PartialRef>(item.value)) {
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.serviceQueue.emplace_back(*serviceType, wref);
+ wref->check(reply);
+ }
}
}
}
@@ -457,6 +504,30 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
}
}
+void Server::Peer::updateService(ReplyBuilder & reply)
+{
+ decltype(serviceQueue) next;
+ for (auto & x : serviceQueue) {
+ if (auto ref = std::get<1>(x)->check(reply)) {
+ if (lpeer) {
+ Service::Context ctx(new Service::Context::Priv {
+ .ref = *ref,
+ .peer = erebos::Peer(lpeer),
+ });
+
+ for (auto & svc : server.services)
+ if (svc->uuid() == std::get<UUID>(x)) {
+ svc->handle(ctx);
+ break;
+ }
+ }
+ } else {
+ next.push_back(std::move(x));
+ }
+ }
+ serviceQueue = std::move(next);
+}
+
void ReplyBuilder::header(TransportHeader::Item && item)
{
@@ -557,7 +628,7 @@ optional<TransportHeader> TransportHeader::load(const PartialObject & obj)
.value = *ref,
});
} else if (item.name == "STP") {
- if (auto val = item.asText())
+ if (auto val = item.asUUID())
items.emplace_back(Item {
.type = Type::ServiceType,
.value = *val,
@@ -609,7 +680,7 @@ PartialObject TransportHeader::toObject() const
break;
case Type::ServiceType:
- ritems.emplace_back("STP", std::get<string>(item.value));
+ ritems.emplace_back("STP", std::get<UUID>(item.value));
break;
case Type::ServiceRef: