summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2020-02-16 20:18:32 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2020-02-16 20:18:32 +0100
commit0e9e9c4d233a331e10dfb2db889fe437d0911ba2 (patch)
tree450b78801436d007efd5eaaf2b52f3e7dfe95a96 /src
parent69e4c826a34eb84c36bb07338a9a292a520f5970 (diff)
Peer list in public API
Diffstat (limited to 'src')
-rw-r--r--src/identity.cpp7
-rw-r--r--src/network.cpp95
-rw-r--r--src/network.h26
3 files changed, 123 insertions, 5 deletions
diff --git a/src/identity.cpp b/src/identity.cpp
index 00abf0b..61059ab 100644
--- a/src/identity.cpp
+++ b/src/identity.cpp
@@ -46,6 +46,13 @@ optional<Identity> Identity::owner() const
return p->owner;
}
+const Identity & Identity::finalOwner() const
+{
+ if (p->owner)
+ return p->owner->finalOwner();
+ return *this;
+}
+
Stored<PublicKey> Identity::keyMessage() const
{
return p->keyMessage;
diff --git a/src/network.cpp b/src/network.cpp
index f778bf9..f3a3651 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -6,12 +6,14 @@
#include <cstring>
#include <iostream>
+#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <unistd.h>
using std::holds_alternative;
using std::scoped_lock;
+using std::to_string;
using std::unique_lock;
using namespace erebos;
@@ -23,6 +25,87 @@ Server::Server(const Identity & self):
Server::~Server() = default;
+PeerList & Server::peerList() const
+{
+ return p->plist;
+}
+
+
+Peer::Peer(const shared_ptr<Priv> & p): p(p) {}
+Peer::~Peer() = default;
+
+string Peer::name() const
+{
+ if (auto speer = p->speer.lock()) {
+ if (holds_alternative<Identity>(speer->identity))
+ if (auto name = std::get<Identity>(speer->identity).finalOwner().name())
+ return *name;
+ if (holds_alternative<shared_ptr<WaitingRef>>(speer->identity))
+ return string(std::get<shared_ptr<WaitingRef>>(speer->identity)->ref.digest());
+
+ char buf[16];
+ if (inet_ntop(AF_INET, &speer->addr.sin_addr, buf, sizeof(buf)))
+ return string(buf) + ":" + to_string(ntohs(speer->addr.sin_port));
+ return "<invalid address>";
+ }
+ return "<server closed>";
+}
+
+optional<Identity> Peer::identity() const
+{
+ if (auto speer = p->speer.lock())
+ if (holds_alternative<Identity>(speer->identity))
+ return std::get<Identity>(speer->identity);
+ return nullopt;
+}
+
+void Peer::Priv::notifyWatchers()
+{
+ if (auto slist = list.lock()) {
+ Peer p(shared_from_this());
+ for (const auto & w : slist->watchers)
+ w(listIndex, &p);
+ }
+}
+
+
+PeerList::PeerList(): p(new Priv) {}
+PeerList::PeerList(const shared_ptr<PeerList::Priv> & p): p(p) {}
+PeerList::~PeerList() = default;
+
+void PeerList::Priv::push(const shared_ptr<Server::Peer> & speer)
+{
+ scoped_lock lock(dataMutex);
+ size_t s = peers.size();
+
+ speer->lpeer.reset(new Peer::Priv);
+ speer->lpeer->speer = speer;
+ speer->lpeer->list = shared_from_this();
+ speer->lpeer->listIndex = s;
+
+ Peer p(speer->lpeer);
+
+ peers.push_back(speer->lpeer);
+ for (const auto & w : watchers)
+ w(s, &p);
+}
+
+size_t PeerList::size() const
+{
+ return p->peers.size();
+}
+
+Peer PeerList::at(size_t i) const
+{
+ return Peer(p->peers.at(i));
+}
+
+void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
+{
+ p->watchers.push_back(w);
+}
+
+
Server::Priv::Priv(const Identity & self):
self(self)
{
@@ -174,15 +257,16 @@ Server::Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
return *peer;
auto st = self.ref()->storage().deriveEphemeralStorage();
- Peer * peer = new Peer {
+ shared_ptr<Peer> peer(new Peer {
.server = *this,
.addr = paddr,
.identity = monostate(),
.channel = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
- };
- peers.emplace_back(peer);
+ });
+ peers.push_back(peer);
+ plist.p->push(peer);
return *peer;
}
@@ -328,8 +412,11 @@ void Server::Peer::updateIdentity(ReplyBuilder & reply)
{
if (holds_alternative<shared_ptr<WaitingRef>>(identity))
if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check(&reply.header))
- if (auto id = Identity::load(*ref))
+ if (auto id = Identity::load(*ref)) {
identity.emplace<Identity>(*id);
+ if (lpeer)
+ lpeer->notifyWatchers();
+ }
}
void Server::Peer::updateChannel(ReplyBuilder & reply)
diff --git a/src/network.h b/src/network.h
index e07e020..07b5363 100644
--- a/src/network.h
+++ b/src/network.h
@@ -23,6 +23,8 @@ using std::variant;
using std::vector;
using std::weak_ptr;
+using std::enable_shared_from_this;
+
namespace chrono = std::chrono;
using chrono::steady_clock;
@@ -49,11 +51,31 @@ struct Server::Peer
Storage tempStorage;
PartialStorage partStorage;
+ shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
+
void send(const struct TransportHeader &, const vector<Object> &) const;
void updateIdentity(struct ReplyBuilder &);
void updateChannel(struct ReplyBuilder &);
};
+struct Peer::Priv : enable_shared_from_this<Peer::Priv>
+{
+ weak_ptr<Server::Peer> speer;
+ weak_ptr<PeerList::Priv> list;
+ size_t listIndex;
+
+ void notifyWatchers();
+};
+
+struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>
+{
+ mutex dataMutex;
+ vector<shared_ptr<Peer::Priv>> peers;
+ vector<function<void(size_t, const Peer *)>> watchers;
+
+ void push(const shared_ptr<Server::Peer> &);
+};
+
struct TransportHeader
{
enum class Type {
@@ -118,7 +140,9 @@ struct Server::Priv
thread threadListen;
thread threadAnnounce;
- vector<unique_ptr<Peer>> peers;
+ vector<shared_ptr<Peer>> peers;
+ PeerList plist;
+
vector<struct TransportHeader> outgoing;
vector<weak_ptr<WaitingRef>> waiting;