summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp80
1 files changed, 53 insertions, 27 deletions
diff --git a/src/network.cpp b/src/network.cpp
index f3a3651..9a9dc41 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -209,8 +209,8 @@ void Server::Priv::doListen()
peer.updateIdentity(reply);
peer.updateChannel(reply);
- if (!reply.header.empty())
- peer.send(TransportHeader(reply.header), reply.body);
+ if (!reply.header().empty())
+ peer.send(TransportHeader(reply.header()), reply.body());
}
} else {
std::cerr << "invalid packet\n";
@@ -292,8 +292,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- reply.header.push_back({ TransportHeader::Type::DataResponse, *ref });
- reply.body.push_back(**ref);
+ reply.header({ TransportHeader::Type::DataResponse, *ref });
+ reply.body(*ref);
}
}
break;
@@ -301,12 +301,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
case TransportHeader::Type::DataResponse:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
for (auto & pwref : waiting) {
if (auto wref = pwref.lock()) {
if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
wref->missing.end()) {
- if (wref->check(&reply.header))
+ if (wref->check(reply))
pwref.reset();
}
}
@@ -322,7 +322,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
if (holds_alternative<monostate>(peer.identity))
- reply.header.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+ reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()});
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -332,7 +332,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
});
waiting.push_back(wref);
peer.identity = wref;
- wref->check(&reply.header);
+ wref->check(reply);
break;
}
@@ -341,7 +341,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
case TransportHeader::Type::ChannelRequest:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
std::get<Stored<ChannelRequest>>(peer.channel).ref.digest() < pref.digest())
@@ -358,7 +358,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
});
waiting.push_back(wref);
peer.channel = wref;
- wref->check(&reply.header);
+ wref->check(reply);
}
break;
@@ -371,7 +371,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto cres = peer.tempStorage.copy(pref);
if (auto r = std::get_if<Ref>(&cres)) {
if (auto acc = ChannelAccept::load(*r)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
peer.channel.emplace<Stored<Channel>>(acc->data->channel());
}
}
@@ -411,7 +411,7 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o
void Server::Peer::updateIdentity(ReplyBuilder & reply)
{
if (holds_alternative<shared_ptr<WaitingRef>>(identity))
- if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(&reply.header))
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply))
if (auto id = Identity::load(*ref)) {
identity.emplace<Identity>(*id);
if (lpeer)
@@ -428,25 +428,25 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
channel.emplace<Stored<ChannelRequest>>(req);
- reply.header.push_back({ TransportHeader::Type::ChannelRequest, req.ref });
- reply.body.push_back(*req.ref);
- reply.body.push_back(*req->data.ref);
- reply.body.push_back(*req->data->key.ref);
+ reply.header({ TransportHeader::Type::ChannelRequest, req.ref });
+ reply.body(req.ref);
+ reply.body(req->data.ref);
+ reply.body(req->data->key.ref);
for (const auto & sig : req->sigs)
- reply.body.push_back(*sig.ref);
+ reply.body(sig.ref);
}
if (holds_alternative<shared_ptr<WaitingRef>>(channel)) {
- if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(&reply.header)) {
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) {
if (auto req = Stored<ChannelRequest>::load(*ref)) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), *req)) {
channel.emplace<Stored<ChannelAccept>>(*acc);
- reply.header.push_back({ TransportHeader::Type::ChannelAccept, acc->ref });
- reply.body.push_back(*acc->ref);
- reply.body.push_back(*acc.value()->data.ref);
- reply.body.push_back(*acc.value()->data->key.ref);
+ reply.header({ TransportHeader::Type::ChannelAccept, acc->ref });
+ reply.body(acc->ref);
+ reply.body(acc.value()->data.ref);
+ reply.body(acc.value()->data->key.ref);
for (const auto & sig : acc.value()->sigs)
- reply.body.push_back(*sig.ref);
+ reply.body(sig.ref);
} else {
channel = monostate();
}
@@ -457,7 +457,34 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
}
}
-optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
+
+void ReplyBuilder::header(TransportHeader::Item && item)
+{
+ for (const auto & x : mheader)
+ if (x.type == item.type && x.value == item.value)
+ return;
+ mheader.emplace_back(std::move(item));
+}
+
+void ReplyBuilder::body(const Ref & ref)
+{
+ for (const auto & x : mbody)
+ if (x.digest() == ref.digest())
+ return;
+ mbody.push_back(ref);
+}
+
+vector<Object> ReplyBuilder::body() const
+{
+ vector<Object> res;
+ res.reserve(mbody.size());
+ for (const Ref & ref : mbody)
+ res.push_back(*ref);
+ return res;
+}
+
+
+optional<Ref> WaitingRef::check(ReplyBuilder & reply)
{
if (auto r = storage.ref(ref.digest()))
return *r;
@@ -467,9 +494,8 @@ optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
return *r;
missing = std::get<vector<Digest>>(res);
- if (request)
- for (const auto & d : missing)
- request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+ for (const auto & d : missing)
+ reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
return nullopt;
}