summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/network.h1
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/network.cpp69
-rw-r--r--src/network.h32
-rw-r--r--src/network/channel.cpp (renamed from src/channel.cpp)0
-rw-r--r--src/network/channel.h (renamed from src/channel.h)2
-rw-r--r--src/network/protocol.cpp7
-rw-r--r--src/network/protocol.h35
8 files changed, 72 insertions, 76 deletions
diff --git a/include/erebos/network.h b/include/erebos/network.h
index 6a3112a..2761a40 100644
--- a/include/erebos/network.h
+++ b/include/erebos/network.h
@@ -99,7 +99,6 @@ public:
string addressStr() const;
uint16_t port() const;
- bool hasChannel() const;
bool send(UUID, const Ref &) const;
bool send(UUID, const Object &) const;
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 478fc50..fff6242 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -4,13 +4,13 @@ include_directories(
add_library(erebos
attach.cpp
- channel.cpp
contact.cpp
frp.cpp
identity.cpp
merge.cpp
message.cpp
network.cpp
+ network/channel.cpp
network/protocol.cpp
pairing.cpp
pubkey.cpp
diff --git a/src/network.cpp b/src/network.cpp
index 725ac50..455496c 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -207,13 +207,6 @@ void Peer::Priv::notifyWatchers()
}
}
-bool Peer::hasChannel() const
-{
- if (auto speer = p->speer.lock())
- return holds_alternative<unique_ptr<Channel>>(speer->channel);
- return false;
-}
-
bool Peer::send(UUID uuid, const Ref & ref) const
{
return send(uuid, ref, *ref);
@@ -396,13 +389,13 @@ void Server::Priv::doListen()
continue;
current = &buf;
- if (holds_alternative<unique_ptr<Channel>>(peer->channel)) {
- if (auto dec = std::get<unique_ptr<Channel>>(peer->channel)->decrypt(buf)) {
+ if (holds_alternative<unique_ptr<Channel>>(peer->connection.channel())) {
+ if (auto dec = std::get<unique_ptr<Channel>>(peer->connection.channel())->decrypt(buf)) {
decrypted = std::move(*dec);
current = &decrypted;
}
- } else if (holds_alternative<Stored<ChannelAccept>>(peer->channel)) {
- if (auto dec = std::get<Stored<ChannelAccept>>(peer->channel)->
+ } else if (holds_alternative<Stored<ChannelAccept>>(peer->connection.channel())) {
+ if (auto dec = std::get<Stored<ChannelAccept>>(peer->connection.channel())->
data->channel()->decrypt(buf)) {
decrypted = std::move(*dec);
current = &decrypted;
@@ -508,7 +501,6 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr)
.connection = protocol.connect(paddr),
.identity = monostate(),
.identityUpdates = {},
- .channel = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
});
@@ -527,7 +519,6 @@ Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)
.connection = move(conn),
.identity = monostate(),
.identityUpdates = {},
- .channel = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
});
@@ -548,16 +539,16 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
switch (item.type) {
case NetworkProtocol::Header::Type::Acknowledged: {
auto dgst = std::get<Digest>(item.value);
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
- std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() == dgst)
+ if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) &&
+ std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() == dgst)
peer.finalizeChannel(reply,
- std::get<Stored<ChannelAccept>>(peer.channel)->data->channel());
+ std::get<Stored<ChannelAccept>>(peer.connection.channel())->data->channel());
break;
}
case NetworkProtocol::Header::Type::DataRequest: {
auto dgst = std::get<Digest>(item.value);
- if (holds_alternative<unique_ptr<Channel>>(peer.channel) ||
+ if (holds_alternative<unique_ptr<Channel>>(peer.connection.channel()) ||
plaintextRefs.find(dgst) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(dgst)) {
reply.header({ NetworkProtocol::Header::Type::DataResponse, ref->digest() });
@@ -595,7 +586,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
.ref = peer.partStorage.ref(dgst),
- .peer = peer,
.missing = {},
});
waiting.push_back(wref);
@@ -613,7 +603,6 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
.ref = peer.partStorage.ref(dgst),
- .peer = peer,
.missing = {},
});
waiting.push_back(wref);
@@ -626,29 +615,28 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
auto dgst = std::get<Digest>(item.value);
reply.header({ NetworkProtocol::Header::Type::Acknowledged, dgst });
- if (holds_alternative<Stored<ChannelRequest>>(peer.channel) &&
- std::get<Stored<ChannelRequest>>(peer.channel).ref().digest() < dgst)
+ if (holds_alternative<Stored<ChannelRequest>>(peer.connection.channel()) &&
+ std::get<Stored<ChannelRequest>>(peer.connection.channel()).ref().digest() < dgst)
break;
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel))
+ if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()))
break;
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
.ref = peer.partStorage.ref(dgst),
- .peer = peer,
.missing = {},
});
waiting.push_back(wref);
- peer.channel = wref;
+ peer.connection.channel() = wref;
wref->check(reply);
break;
}
case NetworkProtocol::Header::Type::ChannelAccept: {
auto dgst = std::get<Digest>(item.value);
- if (holds_alternative<Stored<ChannelAccept>>(peer.channel) &&
- std::get<Stored<ChannelAccept>>(peer.channel).ref().digest() < dgst)
+ if (holds_alternative<Stored<ChannelAccept>>(peer.connection.channel()) &&
+ std::get<Stored<ChannelAccept>>(peer.connection.channel()).ref().digest() < dgst)
break;
auto cres = peer.tempStorage.copy(peer.partStorage.ref(dgst));
@@ -685,9 +673,8 @@ void Server::Priv::handlePacket(Server::Peer & peer, const NetworkProtocol::Head
shared_ptr<WaitingRef> wref(new WaitingRef {
.storage = peer.tempStorage,
- .ref = pref,
- .peer = peer,
- .missing = {},
+ .ref = pref,
+ .missing = {},
});
waiting.push_back(wref);
peer.serviceQueue.emplace_back(*serviceType, wref);
@@ -732,8 +719,8 @@ void Server::Peer::send(const NetworkProtocol::Header & header, const vector<Obj
data.insert(data.end(), part.begin(), part.end());
}
- if (holds_alternative<unique_ptr<Channel>>(channel))
- out = std::get<unique_ptr<Channel>>(channel)->encrypt(data);
+ if (holds_alternative<unique_ptr<Channel>>(connection.channel()))
+ out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);
else if (secure)
secureOutQueue.emplace_back(move(data));
else
@@ -784,10 +771,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
if (!holds_alternative<Identity>(identity))
return;
- if (holds_alternative<monostate>(channel)) {
+ if (holds_alternative<monostate>(connection.channel())) {
auto req = Channel::generateRequest(tempStorage,
server.self, std::get<Identity>(identity));
- channel.emplace<Stored<ChannelRequest>>(req);
+ connection.channel().emplace<Stored<ChannelRequest>>(req);
reply.header({ NetworkProtocol::Header::Type::ChannelRequest, req.ref().digest() });
reply.body(req.ref());
reply.body(req->data.ref());
@@ -796,13 +783,13 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
reply.body(sig.ref());
}
- if (holds_alternative<shared_ptr<WaitingRef>>(channel)) {
- if (auto ref = std::get<shared_ptr<WaitingRef>>(channel)->check(reply)) {
+ if (holds_alternative<shared_ptr<WaitingRef>>(connection.channel())) {
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(connection.channel())->check(reply)) {
auto req = Stored<ChannelRequest>::load(*ref);
if (holds_alternative<Identity>(identity) &&
req->isSignedBy(std::get<Identity>(identity).keyMessage())) {
if (auto acc = Channel::acceptRequest(server.self, std::get<Identity>(identity), req)) {
- channel.emplace<Stored<ChannelAccept>>(*acc);
+ connection.channel().emplace<Stored<ChannelAccept>>(*acc);
reply.header({ NetworkProtocol::Header::Type::ChannelAccept, acc->ref().digest() });
reply.body(acc->ref());
reply.body(acc.value()->data.ref());
@@ -810,10 +797,10 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
for (const auto & sig : acc.value()->sigs)
reply.body(sig.ref());
} else {
- channel = monostate();
+ connection.channel() = monostate();
}
} else {
- channel = monostate();
+ connection.channel() = monostate();
}
}
}
@@ -821,7 +808,7 @@ void Server::Peer::updateChannel(ReplyBuilder & reply)
void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)
{
- channel.emplace<unique_ptr<Channel>>(move(ch));
+ connection.channel().emplace<unique_ptr<Channel>>(move(ch));
vector<NetworkProtocol::Header::Item> hitems;
for (const auto & r : server.self.refs())
@@ -871,11 +858,11 @@ void Server::Peer::trySendOutQueue()
if (secureOutQueue.empty())
return;
- if (!holds_alternative<unique_ptr<Channel>>(channel))
+ if (!holds_alternative<unique_ptr<Channel>>(connection.channel()))
return;
for (const auto & data : secureOutQueue) {
- auto out = std::get<unique_ptr<Channel>>(channel)->encrypt(data);
+ auto out = std::get<unique_ptr<Channel>>(connection.channel())->encrypt(data);
connection.send(out);
}
diff --git a/src/network.h b/src/network.h
index c3a2074..2959adc 100644
--- a/src/network.h
+++ b/src/network.h
@@ -2,7 +2,6 @@
#include <erebos/network.h>
-#include "channel.h"
#include "network/protocol.h"
#include <condition_variable>
@@ -51,12 +50,6 @@ struct Server::Peer
Identity> identity;
vector<shared_ptr<WaitingRef>> identityUpdates;
- variant<monostate,
- Stored<ChannelRequest>,
- shared_ptr<struct WaitingRef>,
- Stored<ChannelAccept>,
- unique_ptr<Channel>> channel;
-
Storage tempStorage;
PartialStorage partStorage;
@@ -91,31 +84,6 @@ struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>
void push(const shared_ptr<Server::Peer> &);
};
-class ReplyBuilder
-{
-public:
- void header(NetworkProtocol::Header::Item &&);
- void body(const Ref &);
-
- const vector<NetworkProtocol::Header::Item> & header() const { return mheader; }
- vector<Object> body() const;
-
-private:
- vector<NetworkProtocol::Header::Item> mheader;
- vector<Ref> mbody;
-};
-
-struct WaitingRef
-{
- const Storage storage;
- const PartialRef ref;
- const Server::Peer & peer;
- vector<Digest> missing;
-
- optional<Ref> check();
- optional<Ref> check(ReplyBuilder &);
-};
-
struct Server::Priv
{
Priv(const Head<LocalState> & local, const Identity & self);
diff --git a/src/channel.cpp b/src/network/channel.cpp
index b317f3d..b317f3d 100644
--- a/src/channel.cpp
+++ b/src/network/channel.cpp
diff --git a/src/channel.h b/src/network/channel.h
index 5f1786e..f932c84 100644
--- a/src/channel.h
+++ b/src/network/channel.h
@@ -2,7 +2,7 @@
#include <erebos/storage.h>
-#include "identity.h"
+#include "../identity.h"
#include <atomic>
#include <memory>
diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp
index fb3a5ea..4151bf2 100644
--- a/src/network/protocol.cpp
+++ b/src/network/protocol.cpp
@@ -24,6 +24,8 @@ struct NetworkProtocol::ConnectionPriv
mutex cmutex {};
vector<uint8_t> buffer {};
+
+ ChannelState channel = monostate();
};
@@ -202,6 +204,11 @@ void NetworkProtocol::Connection::close()
p = nullptr;
}
+NetworkProtocol::ChannelState & NetworkProtocol::Connection::channel()
+{
+ return p->channel;
+}
+
/******************************************************************************/
/* Header */
diff --git a/src/network/protocol.h b/src/network/protocol.h
index 4794ba6..88abf67 100644
--- a/src/network/protocol.h
+++ b/src/network/protocol.h
@@ -1,5 +1,7 @@
#pragma once
+#include "channel.h"
+
#include <erebos/storage.h>
#include <netinet/in.h>
@@ -45,6 +47,12 @@ public:
PollResult poll();
+ using ChannelState = variant<monostate,
+ Stored<ChannelRequest>,
+ shared_ptr<struct WaitingRef>,
+ Stored<ChannelAccept>,
+ unique_ptr<Channel>>;
+
Connection connect(sockaddr_in6 addr);
bool recvfrom(vector<uint8_t> & buffer, sockaddr_in6 & addr);
@@ -84,6 +92,9 @@ public:
void close();
+ // temporary:
+ ChannelState & channel();
+
private:
unique_ptr<ConnectionPriv> p;
};
@@ -121,4 +132,28 @@ struct NetworkProtocol::Header
const vector<Item> items;
};
+class ReplyBuilder
+{
+public:
+ void header(NetworkProtocol::Header::Item &&);
+ void body(const Ref &);
+
+ const vector<NetworkProtocol::Header::Item> & header() const { return mheader; }
+ vector<Object> body() const;
+
+private:
+ vector<NetworkProtocol::Header::Item> mheader;
+ vector<Ref> mbody;
+};
+
+struct WaitingRef
+{
+ const Storage storage;
+ const PartialRef ref;
+ vector<Digest> missing;
+
+ optional<Ref> check();
+ optional<Ref> check(ReplyBuilder &);
+};
+
}