summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp97
1 files changed, 15 insertions, 82 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 455496c..8c181cf 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -117,7 +117,7 @@ void Server::addPeer(const string & node, const string & service) const
header.push_back(NetworkProtocol::Header::Item {
NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() });
}
- peer.send(header, {}, false);
+ peer.connection.send(peer.partStorage, header, {}, false);
return;
}
}
@@ -229,7 +229,7 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const
{ NetworkProtocol::Header::Type::ServiceType, uuid },
{ NetworkProtocol::Header::Type::ServiceRef, ref.digest() },
});
- speer->send(header, { obj }, true);
+ speer->connection.send(speer->partStorage, header, { obj }, true);
return true;
}
@@ -360,7 +360,6 @@ shared_ptr<Server::Priv> Server::Priv::getptr()
void Server::Priv::doListen()
{
- vector<uint8_t> buf, decrypted, *current;
unique_lock lock(dataMutex);
for (; !finish; lock.lock()) {
@@ -385,50 +384,22 @@ void Server::Priv::doListen()
if (!peer)
continue;
- if (not peer->connection.receive(buf))
- continue;
-
- current = &buf;
- if (holds_alternative<unique_ptr<Channel>>(peer->connection.channel())) {
- if (auto dec = std::get<unique_ptr<Channel>>(peer->connection.channel())->decrypt(buf)) {
- decrypted = std::move(*dec);
- current = &decrypted;
- }
- } else if (holds_alternative<Stored<ChannelAccept>>(peer->connection.channel())) {
- if (auto dec = std::get<Stored<ChannelAccept>>(peer->connection.channel())->
- data->channel()->decrypt(buf)) {
- decrypted = std::move(*dec);
- current = &decrypted;
- }
- }
-
- if (auto dec = PartialObject::decodePrefix(peer->partStorage,
- current->begin(), current->end())) {
- if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) {
- auto pos = std::get<1>(*dec);
- while (auto cdec = PartialObject::decodePrefix(peer->partStorage,
- pos, current->end())) {
- peer->partStorage.storeObject(std::get<PartialObject>(*cdec));
- pos = std::get<1>(*cdec);
- }
+ if (auto header = peer->connection.receive(peer->partStorage)) {
+ ReplyBuilder reply;
- ReplyBuilder reply;
-
- scoped_lock hlock(dataMutex);
- shared_lock slock(selfMutex);
+ scoped_lock hlock(dataMutex);
+ shared_lock slock(selfMutex);
- handlePacket(*peer, *header, reply);
- peer->updateIdentity(reply);
- peer->updateChannel(reply);
- peer->updateService(reply);
+ handlePacket(*peer, *header, reply);
+ peer->updateIdentity(reply);
+ peer->updateChannel(reply);
+ peer->updateService(reply);
- if (!reply.header().empty())
- peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false);
+ if (!reply.header().empty())
+ peer->connection.send(peer->partStorage,
+ NetworkProtocol::Header(reply.header()), reply.body(), false);
- peer->trySendOutQueue();
- }
- } else {
- std::cerr << "invalid packet\n";
+ peer->connection.trySendOutQueue();
}
}
}
@@ -703,33 +674,11 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
NetworkProtocol::Header header(hitems);
for (const auto & peer : peers)
- peer->send(header, { **self.ref() }, false);
+ peer->connection.send(peer->partStorage, header, { **self.ref() }, false);
}
}
}
-void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure)
-{
- vector<uint8_t> data, part, out;
-
- part = header.toObject(partStorage).encode();
- data.insert(data.end(), part.begin(), part.end());
- for (const auto & obj : objs) {
- part = obj.encode();
- data.insert(data.end(), part.begin(), part.end());
- }
-
- if (holds_alternative<unique_ptr<Channel>>(connection.channel()))
- out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);
- else if (secure)
- secureOutQueue.emplace_back(move(data));
- else
- out = std::move(data);
-
- if (!out.empty())
- connection.send(out);
-}
-
void Server::Peer::updateIdentity(ReplyBuilder &)
{
if (holds_alternative<shared_ptr<WaitingRef>>(identity)) {
@@ -853,22 +802,6 @@ void Server::Peer::updateService(ReplyBuilder & reply)
serviceQueue = std::move(next);
}
-void Server::Peer::trySendOutQueue()
-{
- if (secureOutQueue.empty())
- return;
-
- if (!holds_alternative<unique_ptr<Channel>>(connection.channel()))
- return;
-
- for (const auto & data : secureOutQueue) {
- auto out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);
- connection.send(out);
- }
-
- secureOutQueue.clear();
-}
-
void ReplyBuilder::header(NetworkProtocol::Header::Item && item)
{