summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/network.h6
-rw-r--r--include/erebos/service.h31
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/network.cpp85
-rw-r--r--src/network.h11
-rw-r--r--src/service.cpp25
-rw-r--r--src/service.h14
7 files changed, 163 insertions, 10 deletions
diff --git a/include/erebos/network.h b/include/erebos/network.h
index 90c85a6..d730fb5 100644
--- a/include/erebos/network.h
+++ b/include/erebos/network.h
@@ -1,6 +1,7 @@
#pragma once
#include <erebos/identity.h>
+#include <erebos/service.h>
#include <functional>
@@ -9,7 +10,7 @@ namespace erebos {
class Server
{
public:
- Server(const Identity &);
+ Server(const Identity &, std::vector<std::unique_ptr<Service>> &&);
~Server();
class PeerList & peerList() const;
@@ -30,6 +31,9 @@ public:
std::string name() const;
std::optional<Identity> identity() const;
+ bool hasChannel() const;
+ bool send(UUID, const Ref &) const;
+
private:
std::shared_ptr<Priv> p;
};
diff --git a/include/erebos/service.h b/include/erebos/service.h
new file mode 100644
index 0000000..7a6f646
--- /dev/null
+++ b/include/erebos/service.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <erebos/storage.h>
+
+namespace erebos {
+
+class Service
+{
+public:
+ Service();
+ virtual ~Service();
+
+ class Context
+ {
+ public:
+ struct Priv;
+ Context(Priv *);
+ Priv & priv();
+
+ const Ref & ref() const;
+ const class Peer & peer() const;
+
+ private:
+ std::unique_ptr<Priv> p;
+ };
+
+ virtual UUID uuid() const = 0;
+ virtual void handle(Context &) const = 0;
+};
+
+}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a50bf66..2bbc7d0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -7,5 +7,6 @@ add_library(erebos
identity
network
pubkey
+ service
storage
)
diff --git a/src/network.cpp b/src/network.cpp
index 9a9dc41..ce0dd30 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -1,6 +1,7 @@
#include "network.h"
#include "identity.h"
+#include "service.h"
#include <algorithm>
#include <cstring>
@@ -18,8 +19,8 @@ using std::unique_lock;
using namespace erebos;
-Server::Server(const Identity & self):
- p(new Priv(self))
+Server::Server(const Identity & self, vector<unique_ptr<Service>> && svcs):
+ p(new Priv(self, std::move(svcs)))
{
}
@@ -68,6 +69,28 @@ void Peer::Priv::notifyWatchers()
}
}
+bool Peer::hasChannel() const
+{
+ if (auto speer = p->speer.lock())
+ return holds_alternative<Stored<Channel>>(speer->channel);
+ return false;
+}
+
+bool Peer::send(UUID uuid, const Ref & ref) const
+{
+ if (hasChannel())
+ if (auto speer = p->speer.lock()) {
+ TransportHeader header({
+ { TransportHeader::Type::ServiceType, uuid },
+ { TransportHeader::Type::ServiceRef, ref },
+ });
+ speer->send(header, { *ref });
+ return true;
+ }
+
+ return false;
+}
+
PeerList::PeerList(): p(new Priv) {}
PeerList::PeerList(const shared_ptr<PeerList::Priv> & p): p(p) {}
@@ -106,8 +129,9 @@ void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
}
-Server::Priv::Priv(const Identity & self):
- self(self)
+Server::Priv::Priv(const Identity & self, vector<unique_ptr<Service>> && svcs):
+ self(self),
+ services(std::move(svcs))
{
struct ifaddrs * raddrs;
if (getifaddrs(&raddrs) < 0)
@@ -208,6 +232,7 @@ void Server::Priv::doListen()
handlePacket(peer, *header, reply);
peer.updateIdentity(reply);
peer.updateChannel(reply);
+ peer.updateService(reply);
if (!reply.header().empty())
peer.send(TransportHeader(reply.header()), reply.body());
@@ -276,6 +301,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
plaintextRefs.insert(obj.ref.digest());
+ optional<UUID> serviceType;
+
for (auto & item : header.items) {
switch (item.type) {
case TransportHeader::Type::Acknowledged:
@@ -379,11 +406,31 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
case TransportHeader::Type::ServiceType:
+ if (!serviceType)
+ serviceType = std::get<UUID>(item.value);
break;
case TransportHeader::Type::ServiceRef:
- break;
+ if (!serviceType)
+ for (auto & item : header.items)
+ if (item.type == TransportHeader::Type::ServiceType) {
+ serviceType = std::get<UUID>(item.value);
+ break;
+ }
+ if (!serviceType)
+ break;
+ if (auto pref = std::get<PartialRef>(item.value)) {
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.serviceQueue.emplace_back(*serviceType, wref);
+ wref->check(reply);
+ }
}
}
}
@@ -457,6 +504,30 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
}
}
+void Server::Peer::updateService(ReplyBuilder & reply)
+{
+ decltype(serviceQueue) next;
+ for (auto & x : serviceQueue) {
+ if (auto ref = std::get<1>(x)->check(reply)) {
+ if (lpeer) {
+ Service::Context ctx(new Service::Context::Priv {
+ .ref = *ref,
+ .peer = erebos::Peer(lpeer),
+ });
+
+ for (auto & svc : server.services)
+ if (svc->uuid() == std::get<UUID>(x)) {
+ svc->handle(ctx);
+ break;
+ }
+ }
+ } else {
+ next.push_back(std::move(x));
+ }
+ }
+ serviceQueue = std::move(next);
+}
+
void ReplyBuilder::header(TransportHeader::Item && item)
{
@@ -557,7 +628,7 @@ optional<TransportHeader> TransportHeader::load(const PartialObject & obj)
.value = *ref,
});
} else if (item.name == "STP") {
- if (auto val = item.asText())
+ if (auto val = item.asUUID())
items.emplace_back(Item {
.type = Type::ServiceType,
.value = *val,
@@ -609,7 +680,7 @@ PartialObject TransportHeader::toObject() const
break;
case Type::ServiceType:
- ritems.emplace_back("STP", std::get<string>(item.value));
+ ritems.emplace_back("STP", std::get<UUID>(item.value));
break;
case Type::ServiceRef:
diff --git a/src/network.h b/src/network.h
index 13bb031..9b146a9 100644
--- a/src/network.h
+++ b/src/network.h
@@ -21,6 +21,7 @@ using std::thread;
using std::unique_ptr;
using std::variant;
using std::vector;
+using std::tuple;
using std::weak_ptr;
using std::enable_shared_from_this;
@@ -31,6 +32,7 @@ using chrono::steady_clock;
namespace erebos {
class ReplyBuilder;
+struct WaitingRef;
struct Server::Peer
{
@@ -53,11 +55,14 @@ struct Server::Peer
Storage tempStorage;
PartialStorage partStorage;
+ vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {};
+
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
void send(const struct TransportHeader &, const vector<Object> &) const;
void updateIdentity(ReplyBuilder &);
void updateChannel(ReplyBuilder &);
+ void updateService(ReplyBuilder &);
};
struct Peer::Priv : enable_shared_from_this<Peer::Priv>
@@ -94,7 +99,7 @@ struct TransportHeader
struct Item {
const Type type;
- const variant<PartialRef, string> value;
+ const variant<PartialRef, UUID> value;
};
TransportHeader(const vector<Item> & items): items(items) {}
@@ -131,7 +136,7 @@ struct WaitingRef
struct Server::Priv
{
- Priv(const Identity & self);
+ Priv(const Identity & self, vector<unique_ptr<Service>> && svcs);
~Priv();
void doListen();
void doAnnounce();
@@ -147,6 +152,8 @@ struct Server::Priv
bool finish = false;
Identity self;
+ vector<unique_ptr<Service>> services;
+
thread threadListen;
thread threadAnnounce;
diff --git a/src/service.cpp b/src/service.cpp
new file mode 100644
index 0000000..f8217c8
--- /dev/null
+++ b/src/service.cpp
@@ -0,0 +1,25 @@
+#include "service.h"
+
+using namespace erebos;
+
+Service::Service() = default;
+Service::~Service() = default;
+
+Service::Context::Context(Priv * p):
+ p(p)
+{}
+
+Service::Context::Priv & Service::Context::priv()
+{
+ return *p;
+}
+
+const Ref & Service::Context::ref() const
+{
+ return p->ref;
+}
+
+const Peer & Service::Context::peer() const
+{
+ return p->peer;
+}
diff --git a/src/service.h b/src/service.h
new file mode 100644
index 0000000..9980471
--- /dev/null
+++ b/src/service.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <erebos/network.h>
+#include <erebos/service.h>
+
+namespace erebos {
+
+struct Service::Context::Priv
+{
+ Ref ref;
+ Peer peer;
+};
+
+}