summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2019-12-27 17:20:04 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2019-12-27 22:59:53 +0100
commitebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 (patch)
treef4a09848ef09bfd816b8aefd6dc3b4e5a5440905
parenta02ef25970cb97ab4c29b3859799062431ae668b (diff)
Network server sending announcements
-rw-r--r--include/erebos/network.h18
-rw-r--r--include/erebos/storage.h6
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/network.cpp224
-rw-r--r--src/network.h73
5 files changed, 318 insertions, 4 deletions
diff --git a/include/erebos/network.h b/include/erebos/network.h
new file mode 100644
index 0000000..c29096f
--- /dev/null
+++ b/include/erebos/network.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <erebos/identity.h>
+
+namespace erebos {
+
+class Server
+{
+public:
+ Server(const Identity &);
+ ~Server();
+
+private:
+ struct Priv;
+ const std::shared_ptr<Priv> p;
+};
+
+};
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 3777572..95a4574 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -124,10 +124,8 @@ public:
template<typename T> std::optional<Stored<T>> as() const;
- private:
- friend class Record;
- std::string name;
- Variant value;
+ const std::string name;
+ const Variant value;
};
private:
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 090ec66..75eff66 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -4,6 +4,7 @@ include_directories(
add_library(erebos
identity
+ network
pubkey
storage
)
diff --git a/src/network.cpp b/src/network.cpp
new file mode 100644
index 0000000..40d1045
--- /dev/null
+++ b/src/network.cpp
@@ -0,0 +1,224 @@
+#include "network.h"
+
+#include "identity.h"
+
+#include <cstring>
+
+#include <ifaddrs.h>
+#include <net/if.h>
+#include <unistd.h>
+
+using std::scoped_lock;
+using std::unique_lock;
+using std::unique_ptr;
+
+using namespace erebos;
+
+Server::Server(const Identity & self):
+ p(new Priv(self))
+{
+}
+
+Server::~Server() = default;
+
+Server::Priv::Priv(const Identity & self):
+ self(self)
+{
+ struct ifaddrs * raddrs;
+ if (getifaddrs(&raddrs) < 0)
+ throw std::system_error(errno, std::generic_category());
+ unique_ptr<ifaddrs, void(*)(ifaddrs *)> addrs(raddrs, freeifaddrs);
+
+ for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) {
+ if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET &&
+ ifa->ifa_flags & IFF_BROADCAST) {
+ bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr);
+ }
+ }
+
+ sock = socket(AF_INET, SOCK_DGRAM, 0);
+ if (sock < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ int enable = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST,
+ &enable, sizeof(enable)) < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ &enable, sizeof(enable)) < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ sockaddr_in laddr = {};
+ laddr.sin_family = AF_INET;
+ laddr.sin_port = htons(discoveryPort);
+ if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ threadListen = thread([this] { doListen(); });
+ threadAnnounce = thread([this] { doAnnounce(); });
+}
+
+Server::Priv::~Priv()
+{
+ {
+ scoped_lock lock(dataMutex);
+ finish = true;
+ }
+
+ announceCondvar.notify_all();
+ threadListen.join();
+ threadAnnounce.join();
+
+ if (sock >= 0)
+ close(sock);
+}
+
+void Server::Priv::doListen()
+{
+}
+
+void Server::Priv::doAnnounce()
+{
+ unique_lock<mutex> lock(dataMutex);
+ auto lastAnnounce = steady_clock::now() - announceInterval;
+
+ while (!finish) {
+ auto now = steady_clock::now();
+
+ if (lastAnnounce + announceInterval < now) {
+ TransportHeader header({
+ { TransportHeader::Type::AnnounceSelf, *self.ref() }
+ });
+
+ vector<uint8_t> bytes = header.store(self.ref()->storage())->encode();
+
+ for (const auto & in : bcastAddresses) {
+ sockaddr_in sin = {};
+ sin.sin_family = AF_INET;
+ sin.sin_addr = in;
+ sin.sin_port = htons(discoveryPort);
+ sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin));
+ }
+
+ lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval);
+ }
+
+ announceCondvar.wait_until(lock, lastAnnounce + announceInterval);
+ }
+}
+
+optional<TransportHeader> TransportHeader::load(const Ref & ref)
+{
+ auto rec = ref->asRecord();
+ if (!rec)
+ return nullopt;
+
+ vector<Item> items;
+ for (const auto & item : rec->items()) {
+ if (item.name == "ACK") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::Acknowledged,
+ .value = *ref,
+ });
+ } else if (item.name == "REQ") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::DataRequest,
+ .value = *ref,
+ });
+ } else if (item.name == "RSP") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::DataResponse,
+ .value = *ref,
+ });
+ } else if (item.name == "ANN") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::AnnounceSelf,
+ .value = *ref,
+ });
+ } else if (item.name == "ANU") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::AnnounceUpdate,
+ .value = *ref,
+ });
+ } else if (item.name == "CRQ") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ChannelRequest,
+ .value = *ref,
+ });
+ } else if (item.name == "CAC") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ChannelAccept,
+ .value = *ref,
+ });
+ } else if (item.name == "STP") {
+ if (auto val = item.asText())
+ items.emplace_back(Item {
+ .type = Type::ServiceType,
+ .value = *val,
+ });
+ } else if (item.name == "SRF") {
+ if (auto ref = item.asRef())
+ items.emplace_back(Item {
+ .type = Type::ServiceRef,
+ .value = *ref,
+ });
+ }
+ }
+
+ return TransportHeader { .items = items };
+}
+
+Ref TransportHeader::store(const Storage & st) const
+{
+ vector<Record::Item> ritems;
+
+ for (const auto & item : items) {
+ switch (item.type) {
+ case Type::Acknowledged:
+ ritems.emplace_back("ACK", std::get<Ref>(item.value));
+ break;
+
+ case Type::DataRequest:
+ ritems.emplace_back("REQ", std::get<Ref>(item.value));
+ break;
+
+ case Type::DataResponse:
+ ritems.emplace_back("RSP", std::get<Ref>(item.value));
+ break;
+
+ case Type::AnnounceSelf:
+ ritems.emplace_back("ANN", std::get<Ref>(item.value));
+ break;
+
+ case Type::AnnounceUpdate:
+ ritems.emplace_back("ANU", std::get<Ref>(item.value));
+ break;
+
+ case Type::ChannelRequest:
+ ritems.emplace_back("CRQ", std::get<Ref>(item.value));
+ break;
+
+ case Type::ChannelAccept:
+ ritems.emplace_back("CAC", std::get<Ref>(item.value));
+ break;
+
+ case Type::ServiceType:
+ ritems.emplace_back("STP", std::get<string>(item.value));
+ break;
+
+ case Type::ServiceRef:
+ ritems.emplace_back("SRF", std::get<Ref>(item.value));
+ break;
+ }
+ }
+
+ return st.storeObject(Record(std::move(ritems)));
+}
diff --git a/src/network.h b/src/network.h
new file mode 100644
index 0000000..bf01cfb
--- /dev/null
+++ b/src/network.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <erebos/network.h>
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <netinet/in.h>
+
+using std::condition_variable;
+using std::mutex;
+using std::optional;
+using std::string;
+using std::thread;
+using std::variant;
+using std::vector;
+
+namespace chrono = std::chrono;
+using chrono::steady_clock;
+
+namespace erebos {
+
+struct TransportHeader
+{
+ enum class Type {
+ Acknowledged,
+ DataRequest,
+ DataResponse,
+ AnnounceSelf,
+ AnnounceUpdate,
+ ChannelRequest,
+ ChannelAccept,
+ ServiceType,
+ ServiceRef,
+ };
+
+ struct Item {
+ const Type type;
+ const variant<Ref, string> value;
+ };
+
+ TransportHeader(const vector<Item> & items): items(items) {}
+ static optional<TransportHeader> load(const Ref &);
+ Ref store(const Storage & st) const;
+
+ const vector<Item> items;
+};
+
+struct Server::Priv
+{
+ Priv(const Identity & self);
+ ~Priv();
+ void doListen();
+ void doAnnounce();
+
+ constexpr static uint16_t discoveryPort { 29665 };
+ constexpr static chrono::seconds announceInterval { 60 };
+
+ mutex dataMutex;
+ condition_variable announceCondvar;
+ bool finish = false;
+
+ Identity self;
+ thread threadListen;
+ thread threadAnnounce;
+
+ int sock;
+ vector<in_addr> bcastAddresses;
+};
+
+}