diff options
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 97 |
1 files changed, 15 insertions, 82 deletions
diff --git a/src/network.cpp b/src/network.cpp index 455496c..8c181cf 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -117,7 +117,7 @@ void Server::addPeer(const string & node, const string & service) const header.push_back(NetworkProtocol::Header::Item { NetworkProtocol::Header::Type::AnnounceSelf, p->self.ref()->digest() }); } - peer.send(header, {}, false); + peer.connection.send(peer.partStorage, header, {}, false); return; } } @@ -229,7 +229,7 @@ bool Peer::send(UUID uuid, const Ref & ref, const Object & obj) const { NetworkProtocol::Header::Type::ServiceType, uuid }, { NetworkProtocol::Header::Type::ServiceRef, ref.digest() }, }); - speer->send(header, { obj }, true); + speer->connection.send(speer->partStorage, header, { obj }, true); return true; } @@ -360,7 +360,6 @@ shared_ptr<Server::Priv> Server::Priv::getptr() void Server::Priv::doListen() { - vector<uint8_t> buf, decrypted, *current; unique_lock lock(dataMutex); for (; !finish; lock.lock()) { @@ -385,50 +384,22 @@ void Server::Priv::doListen() if (!peer) continue; - if (not peer->connection.receive(buf)) - continue; - - current = &buf; - if (holds_alternative<unique_ptr<Channel>>(peer->connection.channel())) { - if (auto dec = std::get<unique_ptr<Channel>>(peer->connection.channel())->decrypt(buf)) { - decrypted = std::move(*dec); - current = &decrypted; - } - } else if (holds_alternative<Stored<ChannelAccept>>(peer->connection.channel())) { - if (auto dec = std::get<Stored<ChannelAccept>>(peer->connection.channel())-> - data->channel()->decrypt(buf)) { - decrypted = std::move(*dec); - current = &decrypted; - } - } - - if (auto dec = PartialObject::decodePrefix(peer->partStorage, - current->begin(), current->end())) { - if (auto header = NetworkProtocol::Header::load(std::get<PartialObject>(*dec))) { - auto pos = std::get<1>(*dec); - while (auto cdec = PartialObject::decodePrefix(peer->partStorage, - pos, current->end())) { - peer->partStorage.storeObject(std::get<PartialObject>(*cdec)); - pos = std::get<1>(*cdec); - } + if (auto header = peer->connection.receive(peer->partStorage)) { + ReplyBuilder reply; - ReplyBuilder reply; - - scoped_lock hlock(dataMutex); - shared_lock slock(selfMutex); + scoped_lock hlock(dataMutex); + shared_lock slock(selfMutex); - handlePacket(*peer, *header, reply); - peer->updateIdentity(reply); - peer->updateChannel(reply); - peer->updateService(reply); + handlePacket(*peer, *header, reply); + peer->updateIdentity(reply); + peer->updateChannel(reply); + peer->updateService(reply); - if (!reply.header().empty()) - peer->send(NetworkProtocol::Header(reply.header()), reply.body(), false); + if (!reply.header().empty()) + peer->connection.send(peer->partStorage, + NetworkProtocol::Header(reply.header()), reply.body(), false); - peer->trySendOutQueue(); - } - } else { - std::cerr << "invalid packet\n"; + peer->connection.trySendOutQueue(); } } } @@ -703,33 +674,11 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) NetworkProtocol::Header header(hitems); for (const auto & peer : peers) - peer->send(header, { **self.ref() }, false); + peer->connection.send(peer->partStorage, header, { **self.ref() }, false); } } } -void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Object> & objs, bool secure) -{ - vector<uint8_t> data, part, out; - - part = header.toObject(partStorage).encode(); - data.insert(data.end(), part.begin(), part.end()); - for (const auto & obj : objs) { - part = obj.encode(); - data.insert(data.end(), part.begin(), part.end()); - } - - if (holds_alternative<unique_ptr<Channel>>(connection.channel())) - out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data); - else if (secure) - secureOutQueue.emplace_back(move(data)); - else - out = std::move(data); - - if (!out.empty()) - connection.send(out); -} - void Server::Peer::updateIdentity(ReplyBuilder &) { if (holds_alternative<shared_ptr<WaitingRef>>(identity)) { @@ -853,22 +802,6 @@ void Server::Peer::updateService(ReplyBuilder & reply) serviceQueue = std::move(next); } -void Server::Peer::trySendOutQueue() -{ - if (secureOutQueue.empty()) - return; - - if (!holds_alternative<unique_ptr<Channel>>(connection.channel())) - return; - - for (const auto & data : secureOutQueue) { - auto out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data); - connection.send(out); - } - - secureOutQueue.clear(); -} - void ReplyBuilder::header(NetworkProtocol::Header::Item && item) { |