summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp156
1 files changed, 79 insertions, 77 deletions
diff --git a/src/network.cpp b/src/network.cpp
index ef06d6e..725ac50 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -115,7 +115,7 @@ void Server::addPeer(const string & node, const string & service) const
{
shared_lock lock(p->selfMutex);
header.push_back(NetworkProtocol::Header::Item {
- NetworkProtocol::Header::Type::AnnounceSelf, *p->self.ref() });
+ NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() });
}
peer.send(header, {}, false);
return;
@@ -234,7 +234,7 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
if (auto speer = p->speer.lock()) {
NetworkProtocol::Header header({
{ NetworkProtocol::Header::Type::ServiceType, uuid },
- { NetworkProtocol::Header::Type::ServiceRef, ref },
+ { NetworkProtocol::Header::Type::ServiceRef, ref.digest() },
});
speer->send(header, { obj }, true);
return true;
@@ -442,6 +442,8 @@ void Server::Priv::doListen()
void Server::Priv::doAnnounce()
{
+ auto pst = self.ref()->storage().derivePartialStorage();
+
unique_lock lock(dataMutex);
auto lastAnnounce = steady_clock::now() - announceInterval;
@@ -451,10 +453,10 @@ void Server::Priv::doAnnounce()
if (lastAnnounce + announceInterval < now) {
shared_lock slock(selfMutex);
NetworkProtocol::Header header({
- { NetworkProtocol::Header::Type::AnnounceSelf, *self.ref() }
+ { NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest() }
});
- vector<uint8_t> bytes = header.toObject().encode();
+ vector<uint8_t> bytes = header.toObject(pst).encode();
for (const auto & in : bcastAddresses) {
sockaddr_in sin = {};
@@ -544,56 +546,55 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
for (auto & item : header.items) {
switch (item.type) {
- case NetworkProtocol::Header::Type::Acknowledged:
- if (auto pref = std::get<PartialRef>(item.value)) {
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
- std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == pref.digest())
- peer.finalizeChannel(reply,
- std::get<Stored<ChannelAccept>>(peer.channel)->data->channel());
- }
+ case NetworkProtocol::Header::Type::Acknowledged: {
+ auto dgst = std::get<Digest>(item.value);
+ if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
+ std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst)
+ peer.finalizeChannel(reply,
+ std::get<Stored<ChannelAccept>>(peer.channel)->data->channel());
break;
+ }
case NetworkProtocol::Header::Type::DataRequest: {
- auto pref = std::get<PartialRef>(item.value);
+ auto dgst = std::get<Digest>(item.value);
if (holds_alternative<unique_ptr<Channel>>(peer.channel) ||
- plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
- if (auto ref = peer.tempStorage.ref(pref.digest())) {
- NetworkProtocol::Header::Item hitem { NetworkProtocol::Header::Type::DataResponse, *ref };
- reply.header({ NetworkProtocol::Header::Type::DataResponse, *ref });
+ plaintextRefs.find(dgst) != plaintextRefs.end()) {
+ if (auto ref = peer.tempStorage.ref(dgst)) {
+ reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() });
reply.body(*ref);
}
}
break;
}
- case NetworkProtocol::Header::Type::DataResponse:
- if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
- for (auto & pwref : waiting) {
- if (auto wref = pwref.lock()) {
- if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
- wref->missing.end()) {
- if (wref->check(reply))
- pwref.reset();
- }
+ case NetworkProtocol::Header::Type::DataResponse: {
+ auto dgst = std::get<Digest>(item.value);
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
+ for (auto & pwref : waiting) {
+ if (auto wref = pwref.lock()) {
+ if (std::find(wref->missing.begin(), wref->missing.end(), dgst) !=
+ wref->missing.end()) {
+ if (wref->check(reply))
+ pwref.reset();
}
}
- waiting.erase(std::remove_if(waiting.begin(), waiting.end(),
- [](auto & wref) { return wref.expired(); }), waiting.end());
}
+ waiting.erase(std::remove_if(waiting.begin(), waiting.end(),
+ [](auto & wref) { return wref.expired(); }), waiting.end());
break;
+ }
case NetworkProtocol::Header::Type::AnnounceSelf: {
- auto pref = std::get<PartialRef>(item.value);
- if (pref.digest() == self.ref()->digest())
+ auto dgst = std::get<Digest>(item.value);
+ if (dgst == self.ref()->digest())
break;
if (holds_alternative<monostate>(peer.identity)) {
- reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, *self.ref()});
+ reply.header({ NetworkProtocol::Header::Type::AnnounceSelf, self.ref()->digest()});
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
- .ref = pref,
+ .ref = peer.partStorage.ref(dgst),
.peer = peer,
.missing = {},
});
@@ -606,12 +607,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
case NetworkProtocol::Header::Type::AnnounceUpdate:
if (holds_alternative<Identity>(peer.identity)) {
- auto pref = std::get<PartialRef>(item.value);
- reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
+ auto dgst = std::get<Digest>(item.value);
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
- .ref = pref,
+ .ref = peer.partStorage.ref(dgst),
.peer = peer,
.missing = {},
});
@@ -621,46 +622,46 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
}
break;
- case NetworkProtocol::Header::Type::ChannelRequest:
- if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
+ case NetworkProtocol::Header::Type::ChannelRequest: {
+ auto dgst = std::get<Digest>(item.value);
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
- if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
- std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < pref.digest())
- break;
+ if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
+ std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst)
+ break;
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel))
- break;
+ if (holds_alternative<Stored<ChannelAccept>>(peer.channel))
+ break;
- shared_ptr<WaitingRef> wref(new WaitingRef {
- .storage = peer.tempStorage,
- .ref = pref,
- .peer = peer,
- .missing = {},
- });
- waiting.push_back(wref);
- peer.channel = wref;
- wref->check(reply);
- }
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = peer.partStorage.ref(dgst),
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.channel = wref;
+ wref->check(reply);
break;
+ }
- case NetworkProtocol::Header::Type::ChannelAccept:
- if (auto pref = std::get<PartialRef>(item.value)) {
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
- std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < pref.digest())
- break;
-
- auto cres = peer.tempStorage.copy(pref);
- if (auto r = std::get_if<Ref>(&cres)) {
- auto acc = ChannelAccept::load(*r);
- if (holds_alternative<Identity>(peer.identity) &&
- acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) {
- reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
- peer.finalizeChannel(reply, acc.data->channel());
- }
+ case NetworkProtocol::Header::Type::ChannelAccept: {
+ auto dgst = std::get<Digest>(item.value);
+ if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
+ std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst)
+ break;
+
+ auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst));
+ if (auto r = std::get_if<Ref>(&cres)) {
+ auto acc = ChannelAccept::load(*r);
+ if (holds_alternative<Identity>(peer.identity) &&
+ acc.isSignedBy(std::get<Identity>(peer.identity).keyMessage())) {
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
+ peer.finalizeChannel(reply, acc.data->channel());
}
}
break;
+ }
case NetworkProtocol::Header::Type::ServiceType:
if (!serviceType)
@@ -677,9 +678,10 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
if (!serviceType)
break;
- auto pref = std::get<PartialRef>(item.value);
+ auto dgst = std::get<Digest>(item.value);
+ auto pref = peer.partStorage.ref(dgst);
if (pref)
- reply.header({ NetworkProtocol::Header::Type::Acknowledged, pref });
+ reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -706,10 +708,10 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : self.refs())
hitems.push_back(NetworkProtocol::Header::Item {
- NetworkProtocol::Header::Type::AnnounceUpdate, r });
+ NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });
for (const auto & r : self.updates())
hitems.push_back(NetworkProtocol::Header::Item {
- NetworkProtocol::Header::Type::AnnounceUpdate, r });
+ NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });
NetworkProtocol::Header header(hitems);
@@ -723,7 +725,7 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj
{
vector<uint8_t> data, part, out;
- part = header.toObject().encode();
+ part = header.toObject(partStorage).encode();
data.insert(data.end(), part.begin(), part.end());
for (const auto & obj : objs) {
part = obj.encode();
@@ -786,7 +788,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
channel.emplace<Stored<ChannelRequest>>(req);
- reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() });
reply.body(req.ref());
reply.body(req->data.ref());
reply.body(req->data->key.ref());
@@ -801,7 +803,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
req->isSignedBy(std::get<Identity>(identity).keyMessage())) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {
channel.emplace<Stored<ChannelAccept>>(*acc);
- reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref() });
+ reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() });
reply.body(acc->ref());
reply.body(acc.value()->data.ref());
reply.body(acc.value()->data->key.ref());
@@ -824,10 +826,10 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)
vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : server.self.refs())
reply.header(NetworkProtocol::Header::Item {
- NetworkProtocol::Header::Type::AnnounceUpdate, r });
+ NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });
for (const auto & r : server.self.updates())
reply.header(NetworkProtocol::Header::Item {
- NetworkProtocol::Header::Type::AnnounceUpdate, r });
+ NetworkProtocol::Header::Type::AnnounceUpdate, r.digest() });
}
void Server::Peer::updateService(ReplyBuilder & reply)
@@ -926,7 +928,7 @@ optional<Ref> WaitingRef::check(ReplyBuilder & reply)
return r;
for (const auto & d : missing)
- reply.header({ NetworkProtocol::Header::Type::DataRequest, peer.partStorage.ref(d) });
+ reply.header({ NetworkProtocol::Header::Type::DataRequest, d });
return nullopt;
}