diff options
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 87 |
1 files changed, 65 insertions, 22 deletions
diff --git a/src/network.cpp b/src/network.cpp index b5dfd68..786e752 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -175,7 +175,7 @@ optional<Identity> Peer::identity() const const sockaddr_in6 & Peer::address() const { if (auto speer = p->speer.lock()) - return speer->addr; + return speer->connection.peerAddress(); throw runtime_error("Server no longer running"); } @@ -373,36 +373,49 @@ void Server::Priv::doListen() for (; !finish; lock.lock()) { lock.unlock(); - sockaddr_in6 paddr; - if (not protocol.recvfrom(buf, paddr)) + Peer * peer = nullptr; + auto res = protocol.poll(); + + if (holds_alternative<NetworkProtocol::ProtocolClosed>(res)) break; - if (isSelfAddress(paddr)) + if (holds_alternative<NetworkProtocol::NewConnection>(res)) { + auto & conn = get<NetworkProtocol::NewConnection>(res).conn; + if (not isSelfAddress(conn.peerAddress())) + peer = &addPeer(move(conn)); + } + + if (holds_alternative<NetworkProtocol::ConnectionReadReady>(res)) { + peer = findPeer(get<NetworkProtocol::ConnectionReadReady>(res).id); + } + + if (!peer) continue; - auto & peer = getPeer(paddr); + if (not peer->connection.receive(buf)) + continue; current = &buf; - if (holds_alternative<unique_ptr<Channel>>(peer.channel)) { - if (auto dec = std::get<unique_ptr<Channel>>(peer.channel)->decrypt(buf)) { + if (holds_alternative<unique_ptr<Channel>>(peer->channel)) { + if (auto dec = std::get<unique_ptr<Channel>>(peer->channel)->decrypt(buf)) { decrypted = std::move(*dec); current = &decrypted; } - } else if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) { - if (auto dec = std::get<Stored<ChannelAccept>>(peer.channel)-> + } else if (holds_alternative<Stored<ChannelAccept>>(peer->channel)) { + if (auto dec = std::get<Stored<ChannelAccept>>(peer->channel)-> data->channel()->decrypt(buf)) { decrypted = std::move(*dec); current = &decrypted; } } - if (auto dec = PartialObject::decodePrefix(peer.partStorage, + if (auto dec = PartialObject::decodePrefix(peer->partStorage, current->begin(), current->end())) { if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { auto pos = std::get<1>(*dec); - while (auto cdec = PartialObject::decodePrefix(peer.partStorage, + while (auto cdec = PartialObject::decodePrefix(peer->partStorage, pos, current->end())) { - peer.partStorage.storeObject(std::get<PartialObject>(*cdec)); + peer->partStorage.storeObject(std::get<PartialObject>(*cdec)); pos = std::get<1>(*cdec); } @@ -411,15 +424,15 @@ void Server::Priv::doListen() scoped_lock hlock(dataMutex); shared_lock slock(selfMutex); - handlePacket(peer, *header, reply); - peer.updateIdentity(reply); - peer.updateChannel(reply); - peer.updateService(reply); + handlePacket(*peer, *header, reply); + peer->updateIdentity(reply); + peer->updateChannel(reply); + peer->updateService(reply); if (!reply.header().empty()) - peer.send(TransportHeader(reply.header()), reply.body(), false); + peer->send(TransportHeader(reply.header()), reply.body(), false); - peer.trySendOutQueue(); + peer->trySendOutQueue(); } } else { std::cerr << "invalid packet\n"; @@ -468,18 +481,48 @@ bool Server::Priv::isSelfAddress(const sockaddr_in6 & paddr) return false; } +Server::Peer * Server::Priv::findPeer(NetworkProtocol::Connection::Id cid) const +{ + scoped_lock lock(dataMutex); + + for (auto & peer : peers) + if (peer->connection.id() == cid) + return peer.get(); + + return nullptr; +} + Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr) { scoped_lock lock(dataMutex); for (auto & peer : peers) - if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0) + if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0) return *peer; auto st = self.ref()->storage().deriveEphemeralStorage(); shared_ptr<Peer> peer(new Peer { .server = *this, - .addr = paddr, + .connection = protocol.connect(paddr), + .identity = monostate(), + .identityUpdates = {}, + .channel = monostate(), + .tempStorage = st, + .partStorage = st.derivePartialStorage(), + }); + peers.push_back(peer); + plist.p->push(peer); + return *peer; +} + +Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn) +{ + scoped_lock lock(dataMutex); + + auto st = self.ref()->storage().deriveEphemeralStorage(); + shared_ptr<Peer> peer(new Peer { + .server = *this, + .connection = move(conn), .identity = monostate(), .identityUpdates = {}, .channel = monostate(), @@ -695,7 +738,7 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o out = std::move(data); if (!out.empty()) - server.protocol.sendto(out, addr); + connection.send(out); } void Server::Peer::updateIdentity(ReplyBuilder &) @@ -831,7 +874,7 @@ void Server::Peer::trySendOutQueue() for (const auto & data : secureOutQueue) { auto out = std::get<unique_ptr<Channel>>(channel)->encrypt(data); - server.protocol.sendto(out, addr); + connection.send(out); } secureOutQueue.clear(); |