summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp229
1 files changed, 45 insertions, 184 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 786e752..ef06d6e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -111,11 +111,11 @@ void Server::addPeer(const string & node, const string & service) const
if (rp->ai_family == AF_INET6) {
Peer & peer = p->getPeer(*(sockaddr_in6 *)rp->ai_addr);
- vector<TransportHeader::Item> header;
+ vector<NetworkProtocol::Header::Item> header;
{
shared_lock lock(p->selfMutex);
- header.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceSelf, *p->self.ref() });
+ header.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() });
}
peer.send(header, {}, false);
return;
@@ -232,9 +232,9 @@ bool Peer::send(UUID uuid, const Object & obj) const
bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
{
if (auto speer = p->speer.lock()) {
- TransportHeader header({
- { TransportHeader::Type::ServiceType, uuid },
- { TransportHeader::Type::ServiceRef, ref },
+ NetworkProtocol::Header header({
+ { NetworkProtocol::Header::Type::ServiceType, uuid },
+ { NetworkProtocol::Header::Type::ServiceRef, ref },
});
speer->send(header, { obj }, true);
return true;
@@ -411,7 +411,7 @@ void Server::Priv::doListen()
if (auto dec = PartialObject::decodePrefix(peer->partStorage,
current->begin(), current->end())) {
- if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) {
+ if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) {
auto pos = std::get<1>(*dec);
while (auto cdec = PartialObject::decodePrefix(peer->partStorage,
pos, current->end())) {
@@ -430,7 +430,7 @@ void Server::Priv::doListen()
peer->updateService(reply);
if (!reply.header().empty())
- peer->send(TransportHeader(reply.header()), reply.body(), false);
+ peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false);
peer->trySendOutQueue();
}
@@ -450,8 +450,8 @@ void Server::Priv::doAnnounce()
if (lastAnnounce + announceInterval < now) {
shared_lock slock(selfMutex);
- TransportHeader header({
- { TransportHeader::Type::AnnounceSelf, *self.ref() }
+ NetworkProtocol::Header header({
+ { NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() }
});
vector<uint8_t> bytes = header.toObject().encode();
@@ -534,7 +534,7 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)
return *peer;
}
-void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & header, ReplyBuilder & reply)
+void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Header & header, ReplyBuilder & reply)
{
unordered_set<Digest> plaintextRefs;
for (const auto & obj : collectStoredObjects(Stored<Object>::load(*self.ref())))
@@ -544,7 +544,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
for (auto & item : header.items) {
switch (item.type) {
- case TransportHeader::Type::Acknowledged:
+ case NetworkProtocol::Header::Type::Acknowledged:
if (auto pref = std::get<PartialRef>(item.value)) {
if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest())
@@ -553,22 +553,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::DataRequest: {
+ case NetworkProtocol::Header::Type::DataRequest: {
auto pref = std::get<PartialRef>(item.value);
if (holds_alternative<unique_ptr<Channel>>(peer.channel) ||
plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
- TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- reply.header({ TransportHeader::Type::DataResponse, *ref });
+ NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref };
+ reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref });
reply.body(*ref);
}
}
break;
}
- case TransportHeader::Type::DataResponse:
+ case NetworkProtocol::Header::Type::DataResponse:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
for (auto & pwref : waiting) {
if (auto wref = pwref.lock()) {
if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
@@ -583,13 +583,13 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::AnnounceSelf: {
+ case NetworkProtocol::Header::Type::AnnounceSelf: {
auto pref = std::get<PartialRef>(item.value);
if (pref.digest() == self.ref()->digest())
break;
if (holds_alternative<monostate>(peer.identity)) {
- reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+ reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()});
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -604,10 +604,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
}
- case TransportHeader::Type::AnnounceUpdate:
+ case NetworkProtocol::Header::Type::AnnounceUpdate:
if (holds_alternative<Identity>(peer.identity)) {
auto pref = std::get<PartialRef>(item.value);
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -621,9 +621,9 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::ChannelRequest:
+ case NetworkProtocol::Header::Type::ChannelRequest:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest())
@@ -644,7 +644,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
}
break;
- case TransportHeader::Type::ChannelAccept:
+ case NetworkProtocol::Header::Type::ChannelAccept:
if (auto pref = std::get<PartialRef>(item.value)) {
if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest())
@@ -655,22 +655,22 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto acc = ChannelAccept::load(*r);
if (holds_alternative<Identity>(peer.identity) &&
acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) {
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
peer.finalizeChannel(reply, acc.data->channel());
}
}
}
break;
- case TransportHeader::Type::ServiceType:
+ case NetworkProtocol::Header::Type::ServiceType:
if (!serviceType)
serviceType = std::get<UUID>(item.value);
break;
- case TransportHeader::Type::ServiceRef:
+ case NetworkProtocol::Header::Type::ServiceRef:
if (!serviceType)
for (auto & item : header.items)
- if (item.type == TransportHeader::Type::ServiceType) {
+ if (item.type == NetworkProtocol::Header::Type::ServiceType) {
serviceType = std::get<UUID>(item.value);
break;
}
@@ -679,7 +679,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto pref = std::get<PartialRef>(item.value);
if (pref)
- reply.header({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -703,15 +703,15 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
if (*id != self) {
self = *id;
- vector<TransportHeader::Item> hitems;
+ vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : self.refs())
- hitems.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ hitems.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
for (const auto & r : self.updates())
- hitems.push_back(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ hitems.push_back(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
- TransportHeader header(hitems);
+ NetworkProtocol::Header header(hitems);
for (const auto & peer : peers)
peer->send(header, { **self.ref() }, false);
@@ -719,7 +719,7 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
}
}
-void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs, bool secure)
+void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure)
{
vector<uint8_t> data, part, out;
@@ -786,7 +786,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
channel.emplace<Stored<ChannelRequest>>(req);
- reply.header({ TransportHeader::Type::ChannelRequest, req.ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() });
reply.body(req.ref());
reply.body(req->data.ref());
reply.body(req->data->key.ref());
@@ -801,7 +801,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
req->isSignedBy(std::get<Identity>(identity).keyMessage())) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {
channel.emplace<Stored<ChannelAccept>>(*acc);
- reply.header({ TransportHeader::Type::ChannelAccept, acc->ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() });
reply.body(acc->ref());
reply.body(acc.value()->data.ref());
reply.body(acc.value()->data->key.ref());
@@ -821,13 +821,13 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)
{
channel.emplace<unique_ptr<Channel>>(move(ch));
- vector<TransportHeader::Item> hitems;
+ vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : server.self.refs())
- reply.header(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ reply.header(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
for (const auto & r : server.self.updates())
- reply.header(TransportHeader::Item {
- TransportHeader::Type::AnnounceUpdate, r });
+ reply.header(NetworkProtocol::Header::Item {
+ NetworkProtocol::Header::Type::AnnounceUpdate, r });
}
void Server::Peer::updateService(ReplyBuilder & reply)
@@ -881,7 +881,7 @@ void Server::Peer::trySendOutQueue()
}
-void ReplyBuilder::header(TransportHeader::Item && item)
+void ReplyBuilder::header(NetworkProtocol::Header::Item && item)
{
for (const auto & x : mheader)
if (x == item)
@@ -926,146 +926,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply)
return r;
for (const auto & d : missing)
- reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+ reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) });
return nullopt;
}
-
-
-bool TransportHeader::Item::operator==(const Item & other) const
-{
- if (type != other.type)
- return false;
-
- if (value.index() != other.value.index())
- return false;
-
- if (holds_alternative<PartialRef>(value))
- return std::get<PartialRef>(value).digest() ==
- std::get<PartialRef>(other.value).digest();
-
- if (holds_alternative<UUID>(value))
- return std::get<UUID>(value) == std::get<UUID>(other.value);
-
- throw runtime_error("unhandled transport header item type");
-}
-
-optional<TransportHeader> TransportHeader::load(const PartialRef & ref)
-{
- return load(*ref);
-}
-
-optional<TransportHeader> TransportHeader::load(const PartialObject & obj)
-{
- auto rec = obj.asRecord();
- if (!rec)
- return nullopt;
-
- vector<Item> items;
- for (const auto & item : rec->items()) {
- if (item.name == "ACK") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::Acknowledged,
- .value = *ref,
- });
- } else if (item.name == "REQ") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::DataRequest,
- .value = *ref,
- });
- } else if (item.name == "RSP") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::DataResponse,
- .value = *ref,
- });
- } else if (item.name == "ANN") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::AnnounceSelf,
- .value = *ref,
- });
- } else if (item.name == "ANU") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::AnnounceUpdate,
- .value = *ref,
- });
- } else if (item.name == "CRQ") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ChannelRequest,
- .value = *ref,
- });
- } else if (item.name == "CAC") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ChannelAccept,
- .value = *ref,
- });
- } else if (item.name == "STP") {
- if (auto val = item.asUUID())
- items.emplace_back(Item {
- .type = Type::ServiceType,
- .value = *val,
- });
- } else if (item.name == "SRF") {
- if (auto ref = item.asRef())
- items.emplace_back(Item {
- .type = Type::ServiceRef,
- .value = *ref,
- });
- }
- }
-
- return TransportHeader(items);
-}
-
-PartialObject TransportHeader::toObject() const
-{
- vector<PartialRecord::Item> ritems;
-
- for (const auto & item : items) {
- switch (item.type) {
- case Type::Acknowledged:
- ritems.emplace_back("ACK", std::get<PartialRef>(item.value));
- break;
-
- case Type::DataRequest:
- ritems.emplace_back("REQ", std::get<PartialRef>(item.value));
- break;
-
- case Type::DataResponse:
- ritems.emplace_back("RSP", std::get<PartialRef>(item.value));
- break;
-
- case Type::AnnounceSelf:
- ritems.emplace_back("ANN", std::get<PartialRef>(item.value));
- break;
-
- case Type::AnnounceUpdate:
- ritems.emplace_back("ANU", std::get<PartialRef>(item.value));
- break;
-
- case Type::ChannelRequest:
- ritems.emplace_back("CRQ", std::get<PartialRef>(item.value));
- break;
-
- case Type::ChannelAccept:
- ritems.emplace_back("CAC", std::get<PartialRef>(item.value));
- break;
-
- case Type::ServiceType:
- ritems.emplace_back("STP", std::get<UUID>(item.value));
- break;
-
- case Type::ServiceRef:
- ritems.emplace_back("SRF", std::get<PartialRef>(item.value));
- break;
- }
- }
-
- return PartialObject(PartialRecord(std::move(ritems)));
-}