diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/network.cpp | 69 | ||||
-rw-r--r-- | src/network.h | 32 | ||||
-rw-r--r-- | src/network/channel.cpp (renamed from src/channel.cpp) | 0 | ||||
-rw-r--r-- | src/network/channel.h (renamed from src/channel.h) | 2 | ||||
-rw-r--r-- | src/network/protocol.cpp | 7 | ||||
-rw-r--r-- | src/network/protocol.h | 35 |
7 files changed, 72 insertions, 75 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 478fc50..fff6242 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,13 +4,13 @@ include_directories( add_library(erebos attach.cpp - channel.cpp contact.cpp frp.cpp identity.cpp merge.cpp message.cpp network.cpp + network/channel.cpp network/protocol.cpp pairing.cpp pubkey.cpp diff --git a/src/network.cpp b/src/network.cpp index 725ac50..455496c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -207,13 +207,6 @@ void Peer::Priv::notifyWatchers() } } -bool Peer::hasChannel() const -{ - if (auto speer = p->speer.lock()) - return holds_alternative<unique_ptr<Channel>>(speer->channel); - return false; -} - bool Peer::send(UUID uuid, const Ref & ref) const { return send(uuid, ref, *ref); @@ -396,13 +389,13 @@ void Server::Priv::doListen() continue; current = &buf; - if (holds_alternative<unique_ptr<Channel>>(peer->channel)) { - if (auto dec = std::get<unique_ptr<Channel>>(peer->channel)->decrypt(buf)) { + if (holds_alternative<unique_ptr<Channel>>(peer->connection.channel())) { + if (auto dec = std::get<unique_ptr<Channel>>(peer->connection.channel())->decrypt(buf)) { decrypted = std::move(*dec); current = &decrypted; } - } else if (holds_alternative<Stored<ChannelAccept>>(peer->channel)) { - if (auto dec = std::get<Stored<ChannelAccept>>(peer->channel)-> + } else if (holds_alternative<Stored<ChannelAccept>>(peer->connection.channel())) { + if (auto dec = std::get<Stored<ChannelAccept>>(peer->connection.channel())-> data->channel()->decrypt(buf)) { decrypted = std::move(*dec); current = &decrypted; @@ -508,7 +501,6 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr) .connection = protocol.connect(paddr), .identity = monostate(), .identityUpdates = {}, - .channel = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }); @@ -527,7 +519,6 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn) .connection = move(conn), .identity = monostate(), .identityUpdates = {}, - .channel = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }); @@ -548,16 +539,16 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head switch (item.type) { case NetworkProtocol::Header::Type::Acknowledged: { auto dgst = std::get<Digest>(item.value); - if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && - std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst) + if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) && + std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() == dgst) peer.finalizeChannel(reply, - std::get<Stored<ChannelAccept>>(peer.channel)->data->channel()); + std::get<Stored<ChannelAccept>>(peer.connection.channel())->data->channel()); break; } case NetworkProtocol::Header::Type::DataRequest: { auto dgst = std::get<Digest>(item.value); - if (holds_alternative<unique_ptr<Channel>>(peer.channel) || + if (holds_alternative<unique_ptr<Channel>>(peer.connection.channel()) || plaintextRefs.find(dgst) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(dgst)) { reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() }); @@ -595,7 +586,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, .ref = peer.partStorage.ref(dgst), - .peer = peer, .missing = {}, }); waiting.push_back(wref); @@ -613,7 +603,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, .ref = peer.partStorage.ref(dgst), - .peer = peer, .missing = {}, }); waiting.push_back(wref); @@ -626,29 +615,28 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head auto dgst = std::get<Digest>(item.value); reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst }); - if (holds_alternative<Stored<ChannelRequest>>(peer.channel) && - std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst) + if (holds_alternative<Stored<ChannelRequest>>(peer.connection.channel()) && + std::get<Stored<ChannelRequest>>(peer.connection.channel()).ref().digest() < dgst) break; - if (holds_alternative<Stored<ChannelAccept>>(peer.channel)) + if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel())) break; shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, .ref = peer.partStorage.ref(dgst), - .peer = peer, .missing = {}, }); waiting.push_back(wref); - peer.channel = wref; + peer.connection.channel() = wref; wref->check(reply); break; } case NetworkProtocol::Header::Type::ChannelAccept: { auto dgst = std::get<Digest>(item.value); - if (holds_alternative<Stored<ChannelAccept>>(peer.channel) && - std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst) + if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) && + std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() < dgst) break; auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst)); @@ -685,9 +673,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head shared_ptr<WaitingRef> wref(new WaitingRef { .storage = peer.tempStorage, - .ref = pref, - .peer = peer, - .missing = {}, + .ref = pref, + .missing = {}, }); waiting.push_back(wref); peer.serviceQueue.emplace_back(*serviceType, wref); @@ -732,8 +719,8 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj data.insert(data.end(), part.begin(), part.end()); } - if (holds_alternative<unique_ptr<Channel>>(channel)) - out = std::get<unique_ptr<Channel>>(channel)->encrypt(data); + if (holds_alternative<unique_ptr<Channel>>(connection.channel())) + out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data); else if (secure) secureOutQueue.emplace_back(move(data)); else @@ -784,10 +771,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) if (!holds_alternative<Identity>(identity)) return; - if (holds_alternative<monostate>(channel)) { + if (holds_alternative<monostate>(connection.channel())) { auto req = Channel::generateRequest(tempStorage, server.self, std::get<Identity>(identity)); - channel.emplace<Stored<ChannelRequest>>(req); + connection.channel().emplace<Stored<ChannelRequest>>(req); reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() }); reply.body(req.ref()); reply.body(req->data.ref()); @@ -796,13 +783,13 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) reply.body(sig.ref()); } - if (holds_alternative<shared_ptr<WaitingRef>>(channel)) { - if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) { + if (holds_alternative<shared_ptr<WaitingRef>>(connection.channel())) { + if (auto ref = std::get<shared_ptr<WaitingRef>>(connection.channel())->check(reply)) { auto req = Stored<ChannelRequest>::load(*ref); if (holds_alternative<Identity>(identity) && req->isSignedBy(std::get<Identity>(identity).keyMessage())) { if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) { - channel.emplace<Stored<ChannelAccept>>(*acc); + connection.channel().emplace<Stored<ChannelAccept>>(*acc); reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() }); reply.body(acc->ref()); reply.body(acc.value()->data.ref()); @@ -810,10 +797,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) for (const auto & sig : acc.value()->sigs) reply.body(sig.ref()); } else { - channel = monostate(); + connection.channel() = monostate(); } } else { - channel = monostate(); + connection.channel() = monostate(); } } } @@ -821,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply) void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch) { - channel.emplace<unique_ptr<Channel>>(move(ch)); + connection.channel().emplace<unique_ptr<Channel>>(move(ch)); vector<NetworkProtocol::Header::Item> hitems; for (const auto & r : server.self.refs()) @@ -871,11 +858,11 @@ void Server::Peer::trySendOutQueue() if (secureOutQueue.empty()) return; - if (!holds_alternative<unique_ptr<Channel>>(channel)) + if (!holds_alternative<unique_ptr<Channel>>(connection.channel())) return; for (const auto & data : secureOutQueue) { - auto out = std::get<unique_ptr<Channel>>(channel)->encrypt(data); + auto out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data); connection.send(out); } diff --git a/src/network.h b/src/network.h index c3a2074..2959adc 100644 --- a/src/network.h +++ b/src/network.h @@ -2,7 +2,6 @@ #include <erebos/network.h> -#include "channel.h" #include "network/protocol.h" #include <condition_variable> @@ -51,12 +50,6 @@ struct Server::Peer Identity> identity; vector<shared_ptr<WaitingRef>> identityUpdates; - variant<monostate, - Stored<ChannelRequest>, - shared_ptr<struct WaitingRef>, - Stored<ChannelAccept>, - unique_ptr<Channel>> channel; - Storage tempStorage; PartialStorage partStorage; @@ -91,31 +84,6 @@ struct PeerList::Priv : enable_shared_from_this<PeerList::Priv> void push(const shared_ptr<Server::Peer> &); }; -class ReplyBuilder -{ -public: - void header(NetworkProtocol::Header::Item &&); - void body(const Ref &); - - const vector<NetworkProtocol::Header::Item> & header() const { return mheader; } - vector<Object> body() const; - -private: - vector<NetworkProtocol::Header::Item> mheader; - vector<Ref> mbody; -}; - -struct WaitingRef -{ - const Storage storage; - const PartialRef ref; - const Server::Peer & peer; - vector<Digest> missing; - - optional<Ref> check(); - optional<Ref> check(ReplyBuilder &); -}; - struct Server::Priv { Priv(const Head<LocalState> & local, const Identity & self); diff --git a/src/channel.cpp b/src/network/channel.cpp index b317f3d..b317f3d 100644 --- a/src/channel.cpp +++ b/src/network/channel.cpp diff --git a/src/channel.h b/src/network/channel.h index 5f1786e..f932c84 100644 --- a/src/channel.h +++ b/src/network/channel.h @@ -2,7 +2,7 @@ #include <erebos/storage.h> -#include "identity.h" +#include "../identity.h" #include <atomic> #include <memory> diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index fb3a5ea..4151bf2 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -24,6 +24,8 @@ struct NetworkProtocol::ConnectionPriv mutex cmutex {}; vector<uint8_t> buffer {}; + + ChannelState channel = monostate(); }; @@ -202,6 +204,11 @@ void NetworkProtocol::Connection::close() p = nullptr; } +NetworkProtocol::ChannelState & NetworkProtocol::Connection::channel() +{ + return p->channel; +} + /******************************************************************************/ /* Header */ diff --git a/src/network/protocol.h b/src/network/protocol.h index 4794ba6..88abf67 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -1,5 +1,7 @@ #pragma once +#include "channel.h" + #include <erebos/storage.h> #include <netinet/in.h> @@ -45,6 +47,12 @@ public: PollResult poll(); + using ChannelState = variant<monostate, + Stored<ChannelRequest>, + shared_ptr<struct WaitingRef>, + Stored<ChannelAccept>, + unique_ptr<Channel>>; + Connection connect(sockaddr_in6 addr); bool recvfrom(vector<uint8_t> & buffer, sockaddr_in6 & addr); @@ -84,6 +92,9 @@ public: void close(); + // temporary: + ChannelState & channel(); + private: unique_ptr<ConnectionPriv> p; }; @@ -121,4 +132,28 @@ struct NetworkProtocol::Header const vector<Item> items; }; +class ReplyBuilder +{ +public: + void header(NetworkProtocol::Header::Item &&); + void body(const Ref &); + + const vector<NetworkProtocol::Header::Item> & header() const { return mheader; } + vector<Object> body() const; + +private: + vector<NetworkProtocol::Header::Item> mheader; + vector<Ref> mbody; +}; + +struct WaitingRef +{ + const Storage storage; + const PartialRef ref; + vector<Digest> missing; + + optional<Ref> check(); + optional<Ref> check(ReplyBuilder &); +}; + } |