summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-08-13 19:01:48 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-08-16 21:49:39 +0200
commit2ed8103ff1c0fca7372b3c3888f590ba41c525e6 (patch)
tree103834746f4b64c7dbaf4a237447108cdf44c8d9 /src/network.cpp
parent7420a170928da75cb860e3fc8804416babdeec8c (diff)
Connection class for network protocol
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp87
1 files changed, 65 insertions, 22 deletions
diff --git a/src/network.cpp b/src/network.cpp
index b5dfd68..786e752 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -175,7 +175,7 @@ optional<Identity> Peer::identity() const
const sockaddr_in6 & Peer::address() const
{
if (auto speer = p->speer.lock())
- return speer->addr;
+ return speer->connection.peerAddress();
throw runtime_error("Server no longer running");
}
@@ -373,36 +373,49 @@ void Server::Priv::doListen()
for (; !finish; lock.lock()) {
lock.unlock();
- sockaddr_in6 paddr;
- if (not protocol.recvfrom(buf, paddr))
+ Peer * peer = nullptr;
+ auto res = protocol.poll();
+
+ if (holds_alternative<NetworkProtocol::ProtocolClosed>(res))
break;
- if (isSelfAddress(paddr))
+ if (holds_alternative<NetworkProtocol::NewConnection>(res)) {
+ auto & conn = get<NetworkProtocol::NewConnection>(res).conn;
+ if (not isSelfAddress(conn.peerAddress()))
+ peer = &addPeer(move(conn));
+ }
+
+ if (holds_alternative<NetworkProtocol::ConnectionReadReady>(res)) {
+ peer = findPeer(get<NetworkProtocol::ConnectionReadReady>(res).id);
+ }
+
+ if (!peer)
continue;
- auto & peer = getPeer(paddr);
+ if (not peer->connection.receive(buf))
+ continue;
current = &buf;
- if (holds_alternative<unique_ptr<Channel>>(peer.channel)) {
- if (auto dec = std::get<unique_ptr<Channel>>(peer.channel)->decrypt(buf)) {
+ if (holds_alternative<unique_ptr<Channel>>(peer->channel)) {
+ if (auto dec = std::get<unique_ptr<Channel>>(peer->channel)->decrypt(buf)) {
decrypted = std::move(*dec);
current = &decrypted;
}
- } else if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) {
- if (auto dec = std::get<Stored<ChannelAccept>>(peer.channel)->
+ } else if (holds_alternative<Stored<ChannelAccept>>(peer->channel)) {
+ if (auto dec = std::get<Stored<ChannelAccept>>(peer->channel)->
data->channel()->decrypt(buf)) {
decrypted = std::move(*dec);
current = &decrypted;
}
}
- if (auto dec = PartialObject::decodePrefix(peer.partStorage,
+ if (auto dec = PartialObject::decodePrefix(peer->partStorage,
current->begin(), current->end())) {
if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) {
auto pos = std::get<1>(*dec);
- while (auto cdec = PartialObject::decodePrefix(peer.partStorage,
+ while (auto cdec = PartialObject::decodePrefix(peer->partStorage,
pos, current->end())) {
- peer.partStorage.storeObject(std::get<PartialObject>(*cdec));
+ peer->partStorage.storeObject(std::get<PartialObject>(*cdec));
pos = std::get<1>(*cdec);
}
@@ -411,15 +424,15 @@ void Server::Priv::doListen()
scoped_lock hlock(dataMutex);
shared_lock slock(selfMutex);
- handlePacket(peer, *header, reply);
- peer.updateIdentity(reply);
- peer.updateChannel(reply);
- peer.updateService(reply);
+ handlePacket(*peer, *header, reply);
+ peer->updateIdentity(reply);
+ peer->updateChannel(reply);
+ peer->updateService(reply);
if (!reply.header().empty())
- peer.send(TransportHeader(reply.header()), reply.body(), false);
+ peer->send(TransportHeader(reply.header()), reply.body(), false);
- peer.trySendOutQueue();
+ peer->trySendOutQueue();
}
} else {
std::cerr << "invalid packet\n";
@@ -468,18 +481,48 @@ bool Server::Priv::isSelfAddress(const sockaddr_in6 & paddr)
return false;
}
+Server::Peer * Server::Priv::findPeer(NetworkProtocol::Connection::Id cid) const
+{
+ scoped_lock lock(dataMutex);
+
+ for (auto & peer : peers)
+ if (peer->connection.id() == cid)
+ return peer.get();
+
+ return nullptr;
+}
+
Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr)
{
scoped_lock lock(dataMutex);
for (auto & peer : peers)
- if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0)
+ if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0)
return *peer;
auto st = self.ref()->storage().deriveEphemeralStorage();
shared_ptr<Peer> peer(new Peer {
.server = *this,
- .addr = paddr,
+ .connection = protocol.connect(paddr),
+ .identity = monostate(),
+ .identityUpdates = {},
+ .channel = monostate(),
+ .tempStorage = st,
+ .partStorage = st.derivePartialStorage(),
+ });
+ peers.push_back(peer);
+ plist.p->push(peer);
+ return *peer;
+}
+
+Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)
+{
+ scoped_lock lock(dataMutex);
+
+ auto st = self.ref()->storage().deriveEphemeralStorage();
+ shared_ptr<Peer> peer(new Peer {
+ .server = *this,
+ .connection = move(conn),
.identity = monostate(),
.identityUpdates = {},
.channel = monostate(),
@@ -695,7 +738,7 @@ void Server::Peer::send(const TransportHeader & header, const vector<Object> & o
out = std::move(data);
if (!out.empty())
- server.protocol.sendto(out, addr);
+ connection.send(out);
}
void Server::Peer::updateIdentity(ReplyBuilder &)
@@ -831,7 +874,7 @@ void Server::Peer::trySendOutQueue()
for (const auto & data : secureOutQueue) {
auto out = std::get<unique_ptr<Channel>>(channel)->encrypt(data);
- server.protocol.sendto(out, addr);
+ connection.send(out);
}
secureOutQueue.clear();