summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/network.cpp229
-rw-r--r--src/network.h42
-rw-r--r--src/network/protocol.cpp150
-rw-r--r--src/network/protocol.h36
4 files changed, 237 insertions, 220 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 786e752..ef06d6e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -111,11 +111,11 @@ void Server::addPeer(const string & node, const string & service) const
if (rp->ai_family == AF_INET6) {
Peer & peer = p->getPeer(*(sockaddr_in6 *)rp->ai_addr);
- vector<TransportHeader::Item> header;
+ vector<NetworkProtocol::Header::Item> header;
{
shared_lock lock(p->selfMutex);
- header.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceSelf, *p->self.ref() });
+ header.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() });
}
peer.send(header, {}, false);
return;
@@ -232,9 +232,9 @@ bool Peer::send(UUID uuid, const Object & obj) const
bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
{
if (auto speer = p->speer.lock()) {
- TransportHeader header({
- { TransportHeader::Type::ServiceType, uuid },
- { TransportHeader::Type::ServiceRef, ref },
+ NetworkProtocol::Header header({
+ { NetworkProtocol::Header::Type::ServiceType, uuid },
+ { NetworkProtocol::Header::Type::ServiceRef, ref },
});
speer->send(header, { obj }, true);
return true;
@@ -411,7 +411,7 @@ void Server::Priv::doListen()
if (auto dec = PartialObject::decodePrefix(peer->partStorage,
current->begin(), current->end())) {
- if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) {
+ if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) {
auto pos = std::get<1>(*dec);
while (auto cdec = PartialObject::decodePrefix(peer->partStorage,
pos, current->end())) {
@@ -430,7 +430,7 @@ void Server::Priv::doListen()
peer->updateService(reply);
if (!reply.header().empty())
- peer->send(TransportHeader(reply.header()), reply.body(), false);
+ peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false);
peer->trySendOutQueue();
}
@@ -450,8 +450,8 @@ void Server::Priv::doAnnounce()
if (lastAnnounce + announceInterval < now) {
shared_lock slock(selfMutex);
- TransportHeader header({
- { TransportHeader::Type::AnnounceSelf, *self.ref() }
+ NetworkProtocol::Header header({
+ { NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() }
});
vector<uint8_t> bytes = header.toObject().encode();
@@ -534,7 +534,7 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)
return *peer;
}
-void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & header, ReplyBuilder & reply)
+void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Header & header, ReplyBuilder & reply)
{
unordered_set<Digest> plaintextRefs;
for (const auto & obj : collectStoredObjects(Stored<Object>::load(*self.ref())))
@@ -544,7 +544,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
for (auto & item : header.items) {
switch (item.type) {
- case TransportHeader::Type::Acknowledged:
+ case NetworkProtocol::Header::Type::Acknowledged:
if (auto pref = std::get<PartialRef>(item.value)) {
if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest())
@@ -553,22 +553,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::DataRequest: {
+ case NetworkProtocol::Header::Type::DataRequest: {
auto pref = std::get<PartialRef>(item.value);
if (holds_alternative<unique_ptr<Channel>>(peer.channel) ||
plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
- TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- reply.header({ TransportHeader::Type::DataResponse, *ref });
+ NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref };
+ reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref });
reply.body(*ref);
}
}
break;
}
- case TransportHeader::Type::DataResponse:
+ case NetworkProtocol::Header::Type::DataResponse:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
for (auto & pwref : waiting) {
if (auto wref = pwref.lock()) {
if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
@@ -583,13 +583,13 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::AnnounceSelf: {
+ case NetworkProtocol::Header::Type::AnnounceSelf: {
auto pref = std::get<PartialRef>(item.value);
if (pref.digest() == self.ref()->digest())
break;
if (holds_alternative<monostate>(peer.identity)) {
- reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+ reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()});
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -604,10 +604,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
}
- case TransportHeader::Type::AnnounceUpdate:
+ case NetworkProtocol::Header::Type::AnnounceUpdate:
if (holds_alternative<Identity>(peer.identity)) {
auto pref = std::get<PartialRef>(item.value);
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -621,9 +621,9 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::ChannelRequest:
+ case NetworkProtocol::Header::Type::ChannelRequest:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest())
@@ -644,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::ChannelAccept:
+ case NetworkProtocol::Header::Type::ChannelAccept:
if (auto pref = std::get<PartialRef>(item.value)) {
if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest())
@@ -655,22 +655,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto acc = ChannelAccept::load(*r);
if (holds_alternative<Identity>(peer.identity) &&
acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
peer.finalizeChannel(reply, acc.data->channel());
}
}
}
break;
- case TransportHeader::Type::ServiceType:
+ case NetworkProtocol::Header::Type::ServiceType:
if (!serviceType)
serviceType = std::get<UUID>(item.value);
break;
- case TransportHeader::Type::ServiceRef:
+ case NetworkProtocol::Header::Type::ServiceRef:
if (!serviceType)
for (auto & item : header.items)
- if (item.type == TransportHeader::Type::ServiceType) {
+ if (item.type == NetworkProtocol::Header::Type::ServiceType) {
serviceType = std::get<UUID>(item.value);
break;
}
@@ -679,7 +679,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto pref = std::get<PartialRef>(item.value);
if (pref)
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -703,15 +703,15 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
if (*id != self) {
self = *id;
- vector<TransportHeader::Item> hitems;
+ vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : self.refs())
- hitems.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ hitems.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
for (const auto & r : self.updates())
- hitems.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ hitems.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
- TransportHeader header(hitems);
+ NetworkProtocol::Header header(hitems);
for (const auto & peer : peers)
peer->send(header, { **self.ref() }, false);
@@ -719,7 +719,7 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
}
}
-void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs, bool secure)
+void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure)
{
vector<uint8_t> data, part, out;
@@ -786,7 +786,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
channel.emplace<Stored<ChannelRequest>>(req);
- reply.header({ TransportHeader::Type::ChannelRequest, req.ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() });
reply.body(req.ref());
reply.body(req->data.ref());
reply.body(req->data->key.ref());
@@ -801,7 +801,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
req->isSignedBy(std::get<Identity>(identity).keyMessage())) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {
channel.emplace<Stored<ChannelAccept>>(*acc);
- reply.header({ TransportHeader::Type::ChannelAccept, acc->ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() });
reply.body(acc->ref());
reply.body(acc.value()->data.ref());
reply.body(acc.value()->data->key.ref());
@@ -821,13 +821,13 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)
{
channel.emplace<unique_ptr<Channel>>(move(ch));
- vector<TransportHeader::Item> hitems;
+ vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : server.self.refs())
- reply.header(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ reply.header(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
for (const auto & r : server.self.updates())
- reply.header(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ reply.header(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
}
void Server::Peer::updateService(ReplyBuilder & reply)
@@ -881,7 +881,7 @@ void Server::Peer::trySendOutQueue()
}
-void ReplyBuilder::header(TransportHeader::Item && item)
+void ReplyBuilder::header(NetworkProtocol::Header::Item && item)
{
for (const auto & x : mheader)
if (x == item)
@@ -926,146 +926,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply)
return r;
for (const auto & d : missing)
- reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+ reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) });
return nullopt;
}
-
-
-bool TransportHeader::Item::operator==(const Item & other) const
-{
- if (type != other.type)
- return false;
-
- if (value.index() != other.value.index())
- return false;
-
- if (holds_alternative<PartialRef>(value))
- return std::get<PartialRef>(value).digest() ==
- std::get<PartialRef>(other.value).digest();
-
- if (holds_alternative<UUID>(value))
- return std::get<UUID>(value) == std::get<UUID>(other.value);
-
- throw runtime_error("unhandled transport header item type");
-}
-
-optional<TransportHeader> TransportHeader::load(const PartialRef & ref)
-{
- return load(*ref);
-}
-
-optional<TransportHeader> TransportHeader::load(const PartialObject & obj)
-{
- auto rec = obj.asRecord();
- if (!rec)
- return nullopt;
-
- vector<Item> items;
- for (const auto & item : rec->items()) {
- if (item.name == "ACK") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::Acknowledged,
- .value = *ref,
- });
- } else if (item.name == "REQ") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::DataRequest,
- .value = *ref,
- });
- } else if (item.name == "RSP") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::DataResponse,
- .value = *ref,
- });
- } else if (item.name == "ANN") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::AnnounceSelf,
- .value = *ref,
- });
- } else if (item.name == "ANU") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::AnnounceUpdate,
- .value = *ref,
- });
- } else if (item.name == "CRQ") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ChannelRequest,
- .value = *ref,
- });
- } else if (item.name == "CAC") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ChannelAccept,
- .value = *ref,
- });
- } else if (item.name == "STP") {
- if (auto val = item.asUUID())
- items.emplace_back(Item {
- .type = Type::ServiceType,
- .value = *val,
- });
- } else if (item.name == "SRF") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ServiceRef,
- .value = *ref,
- });
- }
- }
-
- return TransportHeader(items);
-}
-
-PartialObject TransportHeader::toObject() const
-{
- vector<PartialRecord::Item> ritems;
-
- for (const auto & item : items) {
- switch (item.type) {
- case Type::Acknowledged:
- ritems.emplace_back("ACK", std::get<PartialRef>(item.value));
- break;
-
- case Type::DataRequest:
- ritems.emplace_back("REQ", std::get<PartialRef>(item.value));
- break;
-
- case Type::DataResponse:
- ritems.emplace_back("RSP", std::get<PartialRef>(item.value));
- break;
-
- case Type::AnnounceSelf:
- ritems.emplace_back("ANN", std::get<PartialRef>(item.value));
- break;
-
- case Type::AnnounceUpdate:
- ritems.emplace_back("ANU", std::get<PartialRef>(item.value));
- break;
-
- case Type::ChannelRequest:
- ritems.emplace_back("CRQ", std::get<PartialRef>(item.value));
- break;
-
- case Type::ChannelAccept:
- ritems.emplace_back("CAC", std::get<PartialRef>(item.value));
- break;
-
- case Type::ServiceType:
- ritems.emplace_back("STP", std::get<UUID>(item.value));
- break;
-
- case Type::ServiceRef:
- ritems.emplace_back("SRF", std::get<PartialRef>(item.value));
- break;
- }
- }
-
- return PartialObject(PartialRecord(std::move(ritems)));
-}
diff --git a/src/network.h b/src/network.h
index 74231bf..c3a2074 100644
--- a/src/network.h
+++ b/src/network.h
@@ -65,7 +65,7 @@ struct Server::Peer
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
- void send(const struct TransportHeader &, const vector<Object> &, bool secure);
+ void send(const NetworkProtocol::Header &, const vector<Object> &, bool secure);
void updateIdentity(ReplyBuilder &);
void updateChannel(ReplyBuilder &);
void finalizeChannel(ReplyBuilder &, unique_ptr<Channel>);
@@ -91,47 +91,17 @@ struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>
void push(const shared_ptr<Server::Peer> &);
};
-struct TransportHeader
-{
- enum class Type {
- Acknowledged,
- DataRequest,
- DataResponse,
- AnnounceSelf,
- AnnounceUpdate,
- ChannelRequest,
- ChannelAccept,
- ServiceType,
- ServiceRef,
- };
-
- struct Item {
- const Type type;
- const variant<PartialRef, UUID> value;
-
- bool operator==(const Item &) const;
- bool operator!=(const Item & other) const { return !(*this == other); }
- };
-
- TransportHeader(const vector<Item> & items): items(items) {}
- static optional<TransportHeader> load(const PartialRef &);
- static optional<TransportHeader> load(const PartialObject &);
- PartialObject toObject() const;
-
- const vector<Item> items;
-};
-
class ReplyBuilder
{
public:
- void header(TransportHeader::Item &&);
+ void header(NetworkProtocol::Header::Item &&);
void body(const Ref &);
- const vector<TransportHeader::Item> & header() const { return mheader; }
+ const vector<NetworkProtocol::Header::Item> & header() const { return mheader; }
vector<Object> body() const;
private:
- vector<TransportHeader::Item> mheader;
+ vector<NetworkProtocol::Header::Item> mheader;
vector<Ref> mbody;
};
@@ -160,7 +130,7 @@ struct Server::Priv
Peer * findPeer(NetworkProtocol::Connection::Id cid) const;
Peer & getPeer(const sockaddr_in6 & paddr);
Peer & addPeer(NetworkProtocol::Connection conn);
- void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &);
+ void handlePacket(Peer &, const NetworkProtocol::Header &, ReplyBuilder &);
void handleLocalHeadChange(const Head<LocalState> &);
@@ -181,7 +151,7 @@ struct Server::Priv
vector<shared_ptr<Peer>> peers;
PeerList plist;
- vector<struct TransportHeader> outgoing;
+ vector<struct NetworkProtocol::Header> outgoing;
vector<weak_ptr<WaitingRef>> waiting;
NetworkProtocol protocol;
diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp
index c247bf0..c2c6c5d 100644
--- a/src/network/protocol.cpp
+++ b/src/network/protocol.cpp
@@ -7,7 +7,10 @@
#include <mutex>
#include <system_error>
+using std::holds_alternative;
using std::move;
+using std::nullopt;
+using std::runtime_error;
using std::scoped_lock;
namespace erebos {
@@ -122,6 +125,10 @@ void NetworkProtocol::shutdown()
}
+/******************************************************************************/
+/* Connection */
+/******************************************************************************/
+
NetworkProtocol::Connection::Id NetworkProtocol::ConnectionPriv::id() const
{
return reinterpret_cast<uintptr_t>(this);
@@ -195,4 +202,147 @@ void NetworkProtocol::Connection::close()
p = nullptr;
}
+
+/******************************************************************************/
+/* Header */
+/******************************************************************************/
+
+bool NetworkProtocol::Header::Item::operator==(const Item & other) const
+{
+ if (type != other.type)
+ return false;
+
+ if (value.index() != other.value.index())
+ return false;
+
+ if (holds_alternative<PartialRef>(value))
+ return std::get<PartialRef>(value).digest() ==
+ std::get<PartialRef>(other.value).digest();
+
+ if (holds_alternative<UUID>(value))
+ return std::get<UUID>(value) == std::get<UUID>(other.value);
+
+ throw runtime_error("unhandled network header item type");
+}
+
+optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialRef & ref)
+{
+ return load(*ref);
+}
+
+optional<NetworkProtocol::Header> NetworkProtocol::Header::load(const PartialObject & obj)
+{
+ auto rec = obj.asRecord();
+ if (!rec)
+ return nullopt;
+
+ vector<Item> items;
+ for (const auto & item : rec->items()) {
+ if (item.name == "ACK") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::Acknowledged,
+ .value = *ref,
+ });
+ } else if (item.name == "REQ") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::DataRequest,
+ .value = *ref,
+ });
+ } else if (item.name == "RSP") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::DataResponse,
+ .value = *ref,
+ });
+ } else if (item.name == "ANN") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::AnnounceSelf,
+ .value = *ref,
+ });
+ } else if (item.name == "ANU") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::AnnounceUpdate,
+ .value = *ref,
+ });
+ } else if (item.name == "CRQ") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ChannelRequest,
+ .value = *ref,
+ });
+ } else if (item.name == "CAC") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ChannelAccept,
+ .value = *ref,
+ });
+ } else if (item.name == "STP") {
+ if (auto val = item.asUUID())
+ items.emplace_back(Item {
+ .type = Type::ServiceType,
+ .value = *val,
+ });
+ } else if (item.name == "SRF") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ServiceRef,
+ .value = *ref,
+ });
+ }
+ }
+
+ return NetworkProtocol::Header(items);
+}
+
+PartialObject NetworkProtocol::Header::toObject() const
+{
+ vector<PartialRecord::Item> ritems;
+
+ for (const auto & item : items) {
+ switch (item.type) {
+ case Type::Acknowledged:
+ ritems.emplace_back("ACK", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::DataRequest:
+ ritems.emplace_back("REQ", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::DataResponse:
+ ritems.emplace_back("RSP", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::AnnounceSelf:
+ ritems.emplace_back("ANN", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::AnnounceUpdate:
+ ritems.emplace_back("ANU", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::ChannelRequest:
+ ritems.emplace_back("CRQ", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::ChannelAccept:
+ ritems.emplace_back("CAC", std::get<PartialRef>(item.value));
+ break;
+
+ case Type::ServiceType:
+ ritems.emplace_back("STP", std::get<UUID>(item.value));
+ break;
+
+ case Type::ServiceRef:
+ ritems.emplace_back("SRF", std::get<PartialRef>(item.value));
+ break;
+ }
+ }
+
+ return PartialObject(PartialRecord(std::move(ritems)));
+}
+
}
diff --git a/src/network/protocol.h b/src/network/protocol.h
index a9bbaff..8aa22a2 100644
--- a/src/network/protocol.h
+++ b/src/network/protocol.h
@@ -1,5 +1,7 @@
#pragma once
+#include <erebos/storage.h>
+
#include <netinet/in.h>
#include <cstdint>
@@ -7,10 +9,12 @@
#include <mutex>
#include <variant>
#include <vector>
+#include <optional>
namespace erebos {
using std::mutex;
+using std::optional;
using std::unique_ptr;
using std::variant;
using std::vector;
@@ -28,6 +32,8 @@ public:
class Connection;
+ struct Header;
+
struct NewConnection;
struct ConnectionReadReady;
struct ProtocolClosed {};
@@ -85,4 +91,34 @@ private:
struct NetworkProtocol::NewConnection { Connection conn; };
struct NetworkProtocol::ConnectionReadReady { Connection::Id id; };
+struct NetworkProtocol::Header
+{
+ enum class Type {
+ Acknowledged,
+ DataRequest,
+ DataResponse,
+ AnnounceSelf,
+ AnnounceUpdate,
+ ChannelRequest,
+ ChannelAccept,
+ ServiceType,
+ ServiceRef,
+ };
+
+ struct Item {
+ const Type type;
+ const variant<PartialRef, UUID> value;
+
+ bool operator==(const Item &) const;
+ bool operator!=(const Item & other) const { return !(*this == other); }
+ };
+
+ Header(const vector<Item> & items): items(items) {}
+ static optional<Header> load(const PartialRef &);
+ static optional<Header> load(const PartialObject &);
+ PartialObject toObject() const;
+
+ const vector<Item> items;
+};
+
}