From ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Fri, 27 Dec 2019 17:20:04 +0100 Subject: Network server sending announcements --- include/erebos/network.h | 18 ++++ include/erebos/storage.h | 6 +- src/CMakeLists.txt | 1 + src/network.cpp | 224 +++++++++++++++++++++++++++++++++++++++++++++++ src/network.h | 73 +++++++++++++++ 5 files changed, 318 insertions(+), 4 deletions(-) create mode 100644 include/erebos/network.h create mode 100644 src/network.cpp create mode 100644 src/network.h diff --git a/include/erebos/network.h b/include/erebos/network.h new file mode 100644 index 0000000..c29096f --- /dev/null +++ b/include/erebos/network.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace erebos { + +class Server +{ +public: + Server(const Identity &); + ~Server(); + +private: + struct Priv; + const std::shared_ptr p; +}; + +}; diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 3777572..95a4574 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -124,10 +124,8 @@ public: template std::optional> as() const; - private: - friend class Record; - std::string name; - Variant value; + const std::string name; + const Variant value; }; private: diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 090ec66..75eff66 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ include_directories( add_library(erebos identity + network pubkey storage ) diff --git a/src/network.cpp b/src/network.cpp new file mode 100644 index 0000000..40d1045 --- /dev/null +++ b/src/network.cpp @@ -0,0 +1,224 @@ +#include "network.h" + +#include "identity.h" + +#include + +#include +#include +#include + +using std::scoped_lock; +using std::unique_lock; +using std::unique_ptr; + +using namespace erebos; + +Server::Server(const Identity & self): + p(new Priv(self)) +{ +} + +Server::~Server() = default; + +Server::Priv::Priv(const Identity & self): + self(self) +{ + struct ifaddrs * raddrs; + if (getifaddrs(&raddrs) < 0) + throw std::system_error(errno, std::generic_category()); + unique_ptr addrs(raddrs, freeifaddrs); + + for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET && + ifa->ifa_flags & IFF_BROADCAST) { + bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr); + } + } + + sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) + throw std::system_error(errno, std::generic_category()); + + int enable = 1; + if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + sockaddr_in laddr = {}; + laddr.sin_family = AF_INET; + laddr.sin_port = htons(discoveryPort); + if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0) + throw std::system_error(errno, std::generic_category()); + + threadListen = thread([this] { doListen(); }); + threadAnnounce = thread([this] { doAnnounce(); }); +} + +Server::Priv::~Priv() +{ + { + scoped_lock lock(dataMutex); + finish = true; + } + + announceCondvar.notify_all(); + threadListen.join(); + threadAnnounce.join(); + + if (sock >= 0) + close(sock); +} + +void Server::Priv::doListen() +{ +} + +void Server::Priv::doAnnounce() +{ + unique_lock lock(dataMutex); + auto lastAnnounce = steady_clock::now() - announceInterval; + + while (!finish) { + auto now = steady_clock::now(); + + if (lastAnnounce + announceInterval < now) { + TransportHeader header({ + { TransportHeader::Type::AnnounceSelf, *self.ref() } + }); + + vector bytes = header.store(self.ref()->storage())->encode(); + + for (const auto & in : bcastAddresses) { + sockaddr_in sin = {}; + sin.sin_family = AF_INET; + sin.sin_addr = in; + sin.sin_port = htons(discoveryPort); + sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin)); + } + + lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval); + } + + announceCondvar.wait_until(lock, lastAnnounce + announceInterval); + } +} + +optional TransportHeader::load(const Ref & ref) +{ + auto rec = ref->asRecord(); + if (!rec) + return nullopt; + + vector items; + for (const auto & item : rec->items()) { + if (item.name == "ACK") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::Acknowledged, + .value = *ref, + }); + } else if (item.name == "REQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataRequest, + .value = *ref, + }); + } else if (item.name == "RSP") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataResponse, + .value = *ref, + }); + } else if (item.name == "ANN") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceSelf, + .value = *ref, + }); + } else if (item.name == "ANU") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceUpdate, + .value = *ref, + }); + } else if (item.name == "CRQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelRequest, + .value = *ref, + }); + } else if (item.name == "CAC") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelAccept, + .value = *ref, + }); + } else if (item.name == "STP") { + if (auto val = item.asText()) + items.emplace_back(Item { + .type = Type::ServiceType, + .value = *val, + }); + } else if (item.name == "SRF") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ServiceRef, + .value = *ref, + }); + } + } + + return TransportHeader { .items = items }; +} + +Ref TransportHeader::store(const Storage & st) const +{ + vector ritems; + + for (const auto & item : items) { + switch (item.type) { + case Type::Acknowledged: + ritems.emplace_back("ACK", std::get(item.value)); + break; + + case Type::DataRequest: + ritems.emplace_back("REQ", std::get(item.value)); + break; + + case Type::DataResponse: + ritems.emplace_back("RSP", std::get(item.value)); + break; + + case Type::AnnounceSelf: + ritems.emplace_back("ANN", std::get(item.value)); + break; + + case Type::AnnounceUpdate: + ritems.emplace_back("ANU", std::get(item.value)); + break; + + case Type::ChannelRequest: + ritems.emplace_back("CRQ", std::get(item.value)); + break; + + case Type::ChannelAccept: + ritems.emplace_back("CAC", std::get(item.value)); + break; + + case Type::ServiceType: + ritems.emplace_back("STP", std::get(item.value)); + break; + + case Type::ServiceRef: + ritems.emplace_back("SRF", std::get(item.value)); + break; + } + } + + return st.storeObject(Record(std::move(ritems))); +} diff --git a/src/network.h b/src/network.h new file mode 100644 index 0000000..bf01cfb --- /dev/null +++ b/src/network.h @@ -0,0 +1,73 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include + +using std::condition_variable; +using std::mutex; +using std::optional; +using std::string; +using std::thread; +using std::variant; +using std::vector; + +namespace chrono = std::chrono; +using chrono::steady_clock; + +namespace erebos { + +struct TransportHeader +{ + enum class Type { + Acknowledged, + DataRequest, + DataResponse, + AnnounceSelf, + AnnounceUpdate, + ChannelRequest, + ChannelAccept, + ServiceType, + ServiceRef, + }; + + struct Item { + const Type type; + const variant value; + }; + + TransportHeader(const vector & items): items(items) {} + static optional load(const Ref &); + Ref store(const Storage & st) const; + + const vector items; +}; + +struct Server::Priv +{ + Priv(const Identity & self); + ~Priv(); + void doListen(); + void doAnnounce(); + + constexpr static uint16_t discoveryPort { 29665 }; + constexpr static chrono::seconds announceInterval { 60 }; + + mutex dataMutex; + condition_variable announceCondvar; + bool finish = false; + + Identity self; + thread threadListen; + thread threadAnnounce; + + int sock; + vector bcastAddresses; +}; + +} -- cgit v1.2.3