summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/network.cpp80
-rw-r--r--src/network.h28
2 files changed, 72 insertions, 36 deletions
diff --git a/src/network.cpp b/src/network.cpp
index f3a3651..9a9dc41 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -209,8 +209,8 @@ void Server::Priv::doListen()
peer.updateIdentity(reply);
peer.updateChannel(reply);
- if (!reply.header.empty())
- peer.send(TransportHeader(reply.header), reply.body);
+ if (!reply.header().empty())
+ peer.send(TransportHeader(reply.header()), reply.body());
}
} else {
std::cerr << "invalid packet\n";
@@ -292,8 +292,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- reply.header.push_back({ TransportHeader::Type::DataResponse, *ref });
- reply.body.push_back(**ref);
+ reply.header({ TransportHeader::Type::DataResponse, *ref });
+ reply.body(*ref);
}
}
break;
@@ -301,12 +301,12 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
case TransportHeader::Type::DataResponse:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
for (auto & pwref : waiting) {
if (auto wref = pwref.lock()) {
if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
wref->missing.end()) {
- if (wref->check(&reply.header))
+ if (wref->check(reply))
pwref.reset();
}
}
@@ -322,7 +322,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
break;
if (holds_alternative<monostate>(peer.identity))
- reply.header.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+ reply.header({ TransportHeader::Type::AnnounceSelf, *self.ref()});
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
@@ -332,7 +332,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
});
waiting.push_back(wref);
peer.identity = wref;
- wref->check(&reply.header);
+ wref->check(reply);
break;
}
@@ -341,7 +341,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
case TransportHeader::Type::ChannelRequest:
if (auto pref = std::get<PartialRef>(item.value)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
std::get<Stored<ChannelRequest>>(peer.channel).ref.digest() < pref.digest())
@@ -358,7 +358,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
});
waiting.push_back(wref);
peer.channel = wref;
- wref->check(&reply.header);
+ wref->check(reply);
}
break;
@@ -371,7 +371,7 @@ void Server::Priv::handlePacket(Server::Peer & peer, const TransportHeader & hea
auto cres = peer.tempStorage.copy(pref);
if (auto r = std::get_if<Ref>(&cres)) {
if (auto acc = ChannelAccept::load(*r)) {
- reply.header.push_back({ TransportHeader::Type::Acknowledged, pref });
+ reply.header({ TransportHeader::Type::Acknowledged, pref });
peer.channel.emplace<Stored<Channel>>(acc->data->channel());
}
}
@@ -411,7 +411,7 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o
void Server::Peer::updateIdentity(ReplyBuilder & reply)
{
if (holds_alternative<shared_ptr<WaitingRef>>(identity))
- if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(&reply.header))
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(reply))
if (auto id = Identity::load(*ref)) {
identity.emplace<Identity>(*id);
if (lpeer)
@@ -428,25 +428,25 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
channel.emplace<Stored<ChannelRequest>>(req);
- reply.header.push_back({ TransportHeader::Type::ChannelRequest, req.ref });
- reply.body.push_back(*req.ref);
- reply.body.push_back(*req->data.ref);
- reply.body.push_back(*req->data->key.ref);
+ reply.header({ TransportHeader::Type::ChannelRequest, req.ref });
+ reply.body(req.ref);
+ reply.body(req->data.ref);
+ reply.body(req->data->key.ref);
for (const auto & sig : req->sigs)
- reply.body.push_back(*sig.ref);
+ reply.body(sig.ref);
}
if (holds_alternative<shared_ptr<WaitingRef>>(channel)) {
- if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(&reply.header)) {
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) {
if (auto req = Stored<ChannelRequest>::load(*ref)) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), *req)) {
channel.emplace<Stored<ChannelAccept>>(*acc);
- reply.header.push_back({ TransportHeader::Type::ChannelAccept, acc->ref });
- reply.body.push_back(*acc->ref);
- reply.body.push_back(*acc.value()->data.ref);
- reply.body.push_back(*acc.value()->data->key.ref);
+ reply.header({ TransportHeader::Type::ChannelAccept, acc->ref });
+ reply.body(acc->ref);
+ reply.body(acc.value()->data.ref);
+ reply.body(acc.value()->data->key.ref);
for (const auto & sig : acc.value()->sigs)
- reply.body.push_back(*sig.ref);
+ reply.body(sig.ref);
} else {
channel = monostate();
}
@@ -457,7 +457,34 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
}
}
-optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
+
+void ReplyBuilder::header(TransportHeader::Item && item)
+{
+ for (const auto & x : mheader)
+ if (x.type == item.type && x.value == item.value)
+ return;
+ mheader.emplace_back(std::move(item));
+}
+
+void ReplyBuilder::body(const Ref & ref)
+{
+ for (const auto & x : mbody)
+ if (x.digest() == ref.digest())
+ return;
+ mbody.push_back(ref);
+}
+
+vector<Object> ReplyBuilder::body() const
+{
+ vector<Object> res;
+ res.reserve(mbody.size());
+ for (const Ref & ref : mbody)
+ res.push_back(*ref);
+ return res;
+}
+
+
+optional<Ref> WaitingRef::check(ReplyBuilder & reply)
{
if (auto r = storage.ref(ref.digest()))
return *r;
@@ -467,9 +494,8 @@ optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
return *r;
missing = std::get<vector<Digest>>(res);
- if (request)
- for (const auto & d : missing)
- request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+ for (const auto & d : missing)
+ reply.header({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
return nullopt;
}
diff --git a/src/network.h b/src/network.h
index 07b5363..13bb031 100644
--- a/src/network.h
+++ b/src/network.h
@@ -30,6 +30,8 @@ using chrono::steady_clock;
namespace erebos {
+class ReplyBuilder;
+
struct Server::Peer
{
Peer(const Peer &) = delete;
@@ -54,8 +56,8 @@ struct Server::Peer
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
void send(const struct TransportHeader &, const vector<Object> &) const;
- void updateIdentity(struct ReplyBuilder &);
- void updateChannel(struct ReplyBuilder &);
+ void updateIdentity(ReplyBuilder &);
+ void updateChannel(ReplyBuilder &);
};
struct Peer::Priv : enable_shared_from_this<Peer::Priv>
@@ -103,6 +105,20 @@ struct TransportHeader
const vector<Item> items;
};
+class ReplyBuilder
+{
+public:
+ void header(TransportHeader::Item &&);
+ void body(const Ref &);
+
+ const vector<TransportHeader::Item> & header() const { return mheader; }
+ vector<Object> body() const;
+
+private:
+ vector<TransportHeader::Item> mheader;
+ vector<Ref> mbody;
+};
+
struct WaitingRef
{
const Storage storage;
@@ -110,13 +126,7 @@ struct WaitingRef
const Server::Peer & peer;
vector<Digest> missing;
- optional<Ref> check(vector<TransportHeader::Item> * request = nullptr);
-};
-
-struct ReplyBuilder
-{
- vector<TransportHeader::Item> header;
- vector<Object> body;
+ optional<Ref> check(ReplyBuilder &);
};
struct Server::Priv