From ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Roman=20Smr=C5=BE?= <roman.smrz@seznam.cz>
Date: Fri, 27 Dec 2019 17:20:04 +0100
Subject: Network server sending announcements

---
 include/erebos/network.h |  18 ++++
 include/erebos/storage.h |   6 +-
 src/CMakeLists.txt       |   1 +
 src/network.cpp          | 224 +++++++++++++++++++++++++++++++++++++++++++++++
 src/network.h            |  73 +++++++++++++++
 5 files changed, 318 insertions(+), 4 deletions(-)
 create mode 100644 include/erebos/network.h
 create mode 100644 src/network.cpp
 create mode 100644 src/network.h

diff --git a/include/erebos/network.h b/include/erebos/network.h
new file mode 100644
index 0000000..c29096f
--- /dev/null
+++ b/include/erebos/network.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <erebos/identity.h>
+
+namespace erebos {
+
+class Server
+{
+public:
+	Server(const Identity &);
+	~Server();
+
+private:
+	struct Priv;
+	const std::shared_ptr<Priv> p;
+};
+
+};
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 3777572..95a4574 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -124,10 +124,8 @@ public:
 
 		template<typename T> std::optional<Stored<T>> as() const;
 
-	private:
-		friend class Record;
-		std::string name;
-		Variant value;
+		const std::string name;
+		const Variant value;
 	};
 
 private:
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 090ec66..75eff66 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -4,6 +4,7 @@ include_directories(
 
 add_library(erebos
 	identity
+	network
 	pubkey
 	storage
 )
diff --git a/src/network.cpp b/src/network.cpp
new file mode 100644
index 0000000..40d1045
--- /dev/null
+++ b/src/network.cpp
@@ -0,0 +1,224 @@
+#include "network.h"
+
+#include "identity.h"
+
+#include <cstring>
+
+#include <ifaddrs.h>
+#include <net/if.h>
+#include <unistd.h>
+
+using std::scoped_lock;
+using std::unique_lock;
+using std::unique_ptr;
+
+using namespace erebos;
+
+Server::Server(const Identity & self):
+	p(new Priv(self))
+{
+}
+
+Server::~Server() = default;
+
+Server::Priv::Priv(const Identity & self):
+	self(self)
+{
+	struct ifaddrs * raddrs;
+	if (getifaddrs(&raddrs) < 0)
+		throw std::system_error(errno, std::generic_category());
+	unique_ptr<ifaddrs, void(*)(ifaddrs *)> addrs(raddrs, freeifaddrs);
+
+	for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) {
+		if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET &&
+				ifa->ifa_flags & IFF_BROADCAST) {
+			bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr);
+		}
+	}
+	
+	sock = socket(AF_INET, SOCK_DGRAM, 0);
+	if (sock < 0)
+		throw std::system_error(errno, std::generic_category());
+
+	int enable = 1;
+	if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST,
+				&enable, sizeof(enable)) < 0)
+		throw std::system_error(errno, std::generic_category());
+
+	if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+				&enable, sizeof(enable)) < 0)
+		throw std::system_error(errno, std::generic_category());
+
+	sockaddr_in laddr = {};
+	laddr.sin_family = AF_INET;
+	laddr.sin_port = htons(discoveryPort);
+	if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0)
+		throw std::system_error(errno, std::generic_category());
+
+	threadListen = thread([this] { doListen(); });
+	threadAnnounce = thread([this] { doAnnounce(); });
+}
+
+Server::Priv::~Priv()
+{
+	{
+		scoped_lock lock(dataMutex);
+		finish = true;
+	}
+
+	announceCondvar.notify_all();
+	threadListen.join();
+	threadAnnounce.join();
+
+	if (sock >= 0)
+		close(sock);
+}
+
+void Server::Priv::doListen()
+{
+}
+
+void Server::Priv::doAnnounce()
+{
+	unique_lock<mutex> lock(dataMutex);
+	auto lastAnnounce = steady_clock::now() - announceInterval;
+
+	while (!finish) {
+		auto now = steady_clock::now();
+
+		if (lastAnnounce + announceInterval < now) {
+			TransportHeader header({
+				{ TransportHeader::Type::AnnounceSelf, *self.ref() }
+			});
+
+			vector<uint8_t> bytes = header.store(self.ref()->storage())->encode();
+
+			for (const auto & in : bcastAddresses) {
+				sockaddr_in sin = {};
+				sin.sin_family = AF_INET;
+				sin.sin_addr = in;
+				sin.sin_port = htons(discoveryPort);
+				sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin));
+			}
+
+			lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval);
+		}
+
+		announceCondvar.wait_until(lock, lastAnnounce + announceInterval);
+	}
+}
+
+optional<TransportHeader> TransportHeader::load(const Ref & ref)
+{
+	auto rec = ref->asRecord();
+	if (!rec)
+		return nullopt;
+
+	vector<Item> items;
+	for (const auto & item : rec->items()) {
+		if (item.name == "ACK") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::Acknowledged,
+					.value = *ref,
+				});
+		} else if (item.name == "REQ") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::DataRequest,
+					.value = *ref,
+				});
+		} else if (item.name == "RSP") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::DataResponse,
+					.value = *ref,
+				});
+		} else if (item.name == "ANN") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::AnnounceSelf,
+					.value = *ref,
+				});
+		} else if (item.name == "ANU") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::AnnounceUpdate,
+					.value = *ref,
+				});
+		} else if (item.name == "CRQ") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::ChannelRequest,
+					.value = *ref,
+				});
+		} else if (item.name == "CAC") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::ChannelAccept,
+					.value = *ref,
+				});
+		} else if (item.name == "STP") {
+			if (auto val = item.asText())
+				items.emplace_back(Item {
+					.type = Type::ServiceType,
+					.value = *val,
+				});
+		} else if (item.name == "SRF") {
+			if (auto ref = item.asRef())
+				items.emplace_back(Item {
+					.type = Type::ServiceRef,
+					.value = *ref,
+				});
+		}
+	}
+
+	return TransportHeader { .items = items };
+}
+
+Ref TransportHeader::store(const Storage & st) const
+{
+	vector<Record::Item> ritems;
+
+	for (const auto & item : items) {
+		switch (item.type) {
+		case Type::Acknowledged:
+			ritems.emplace_back("ACK", std::get<Ref>(item.value));
+			break;
+
+		case Type::DataRequest:
+			ritems.emplace_back("REQ", std::get<Ref>(item.value));
+			break;
+
+		case Type::DataResponse:
+			ritems.emplace_back("RSP", std::get<Ref>(item.value));
+			break;
+
+		case Type::AnnounceSelf:
+			ritems.emplace_back("ANN", std::get<Ref>(item.value));
+			break;
+
+		case Type::AnnounceUpdate:
+			ritems.emplace_back("ANU", std::get<Ref>(item.value));
+			break;
+
+		case Type::ChannelRequest:
+			ritems.emplace_back("CRQ", std::get<Ref>(item.value));
+			break;
+
+		case Type::ChannelAccept:
+			ritems.emplace_back("CAC", std::get<Ref>(item.value));
+			break;
+
+		case Type::ServiceType:
+			ritems.emplace_back("STP", std::get<string>(item.value));
+			break;
+
+		case Type::ServiceRef:
+			ritems.emplace_back("SRF", std::get<Ref>(item.value));
+			break;
+		}
+	}
+
+	return st.storeObject(Record(std::move(ritems)));
+}
diff --git a/src/network.h b/src/network.h
new file mode 100644
index 0000000..bf01cfb
--- /dev/null
+++ b/src/network.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <erebos/network.h>
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <netinet/in.h>
+
+using std::condition_variable;
+using std::mutex;
+using std::optional;
+using std::string;
+using std::thread;
+using std::variant;
+using std::vector;
+
+namespace chrono = std::chrono;
+using chrono::steady_clock;
+
+namespace erebos {
+
+struct TransportHeader
+{
+	enum class Type {
+		Acknowledged,
+		DataRequest,
+		DataResponse,
+		AnnounceSelf,
+		AnnounceUpdate,
+		ChannelRequest,
+		ChannelAccept,
+		ServiceType,
+		ServiceRef,
+	};
+
+	struct Item {
+		const Type type;
+		const variant<Ref, string> value;
+	};
+
+	TransportHeader(const vector<Item> & items): items(items) {}
+	static optional<TransportHeader> load(const Ref &);
+	Ref store(const Storage & st) const;
+
+	const vector<Item> items;
+};
+
+struct Server::Priv
+{
+	Priv(const Identity & self);
+	~Priv();
+	void doListen();
+	void doAnnounce();
+
+	constexpr static uint16_t discoveryPort { 29665 };
+	constexpr static chrono::seconds announceInterval { 60 };
+
+	mutex dataMutex;
+	condition_variable announceCondvar;
+	bool finish = false;
+
+	Identity self;
+	thread threadListen;
+	thread threadAnnounce;
+
+	int sock;
+	vector<in_addr> bcastAddresses;
+};
+
+}
-- 
cgit v1.2.3