summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/network.cpp51
-rw-r--r--src/network.h4
2 files changed, 40 insertions, 15 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 8ee61b3..2db450c 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -14,6 +14,7 @@
#include <unistd.h>
using std::holds_alternative;
+using std::move;
using std::runtime_error;
using std::scoped_lock;
using std::to_string;
@@ -148,15 +149,14 @@ bool Peer::send(UUID uuid, const Object & obj) const
bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
{
- if (hasChannel())
- if (auto speer = p->speer.lock()) {
- TransportHeader header({
- { TransportHeader::Type::ServiceType, uuid },
- { TransportHeader::Type::ServiceRef, ref },
- });
- speer->send(header, { obj });
- return true;
- }
+ if (auto speer = p->speer.lock()) {
+ TransportHeader header({
+ { TransportHeader::Type::ServiceType, uuid },
+ { TransportHeader::Type::ServiceRef, ref },
+ });
+ speer->send(header, { obj }, true);
+ return true;
+ }
return false;
}
@@ -326,7 +326,9 @@ void Server::Priv::doListen()
peer.updateService(reply);
if (!reply.header().empty())
- peer.send(TransportHeader(reply.header()), reply.body());
+ peer.send(TransportHeader(reply.header()), reply.body(), false);
+
+ peer.trySendOutQueue();
}
} else {
std::cerr << "invalid packet\n";
@@ -541,12 +543,12 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
});
for (const auto & peer : peers)
- peer->send(header, { **self.ref() });
+ peer->send(header, { **self.ref() }, false);
}
}
}
-void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs) const
+void Server::Peer::send(const TransportHeader & header, const vector<Object> & objs, bool secure)
{
vector<uint8_t> data, part, out;
@@ -559,11 +561,14 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o
if (holds_alternative<Stored<Channel>>(channel))
out = std::get<Stored<Channel>>(channel)->encrypt(data);
+ else if (secure)
+ secureOutQueue.emplace_back(move(data));
else
out = std::move(data);
- sendto(server.sock, out.data(), out.size(), 0,
- (sockaddr *) &addr, sizeof(addr));
+ if (!out.empty())
+ sendto(server.sock, out.data(), out.size(), 0,
+ (sockaddr *) &addr, sizeof(addr));
}
void Server::Peer::updateIdentity(ReplyBuilder & reply)
@@ -647,6 +652,24 @@ void Server::Peer::updateService(ReplyBuilder & reply)
serviceQueue = std::move(next);
}
+void Server::Peer::trySendOutQueue()
+{
+ if (secureOutQueue.empty())
+ return;
+
+ if (!holds_alternative<Stored<Channel>>(channel))
+ return;
+
+ for (const auto & data : secureOutQueue) {
+ auto out = std::get<Stored<Channel>>(channel)->encrypt(data);
+
+ sendto(server.sock, out.data(), out.size(), 0,
+ (sockaddr *) &addr, sizeof(addr));
+ }
+
+ secureOutQueue.clear();
+}
+
void ReplyBuilder::header(TransportHeader::Item && item)
{
diff --git a/src/network.h b/src/network.h
index fe7d7b4..c4f3d6f 100644
--- a/src/network.h
+++ b/src/network.h
@@ -56,13 +56,15 @@ struct Server::Peer
PartialStorage partStorage;
vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {};
+ vector<vector<uint8_t>> secureOutQueue {};
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
- void send(const struct TransportHeader &, const vector<Object> &) const;
+ void send(const struct TransportHeader &, const vector<Object> &, bool secure);
void updateIdentity(ReplyBuilder &);
void updateChannel(ReplyBuilder &);
void updateService(ReplyBuilder &);
+ void trySendOutQueue();
};
struct Peer::Priv : enable_shared_from_this<Peer::Priv>