diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2019-12-27 17:20:04 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2019-12-27 22:59:53 +0100 |
commit | ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 (patch) | |
tree | f4a09848ef09bfd816b8aefd6dc3b4e5a5440905 /src | |
parent | a02ef25970cb97ab4c29b3859799062431ae668b (diff) |
Network server sending announcements
Diffstat (limited to 'src')
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/network.cpp | 224 | ||||
-rw-r--r-- | src/network.h | 73 |
3 files changed, 298 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 090ec66..75eff66 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ include_directories( add_library(erebos identity + network pubkey storage ) diff --git a/src/network.cpp b/src/network.cpp new file mode 100644 index 0000000..40d1045 --- /dev/null +++ b/src/network.cpp @@ -0,0 +1,224 @@ +#include "network.h" + +#include "identity.h" + +#include <cstring> + +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> + +using std::scoped_lock; +using std::unique_lock; +using std::unique_ptr; + +using namespace erebos; + +Server::Server(const Identity & self): + p(new Priv(self)) +{ +} + +Server::~Server() = default; + +Server::Priv::Priv(const Identity & self): + self(self) +{ + struct ifaddrs * raddrs; + if (getifaddrs(&raddrs) < 0) + throw std::system_error(errno, std::generic_category()); + unique_ptr<ifaddrs, void(*)(ifaddrs *)> addrs(raddrs, freeifaddrs); + + for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET && + ifa->ifa_flags & IFF_BROADCAST) { + bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr); + } + } + + sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) + throw std::system_error(errno, std::generic_category()); + + int enable = 1; + if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + sockaddr_in laddr = {}; + laddr.sin_family = AF_INET; + laddr.sin_port = htons(discoveryPort); + if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0) + throw std::system_error(errno, std::generic_category()); + + threadListen = thread([this] { doListen(); }); + threadAnnounce = thread([this] { doAnnounce(); }); +} + +Server::Priv::~Priv() +{ + { + scoped_lock lock(dataMutex); + finish = true; + } + + announceCondvar.notify_all(); + threadListen.join(); + threadAnnounce.join(); + + if (sock >= 0) + close(sock); +} + +void Server::Priv::doListen() +{ +} + +void Server::Priv::doAnnounce() +{ + unique_lock<mutex> lock(dataMutex); + auto lastAnnounce = steady_clock::now() - announceInterval; + + while (!finish) { + auto now = steady_clock::now(); + + if (lastAnnounce + announceInterval < now) { + TransportHeader header({ + { TransportHeader::Type::AnnounceSelf, *self.ref() } + }); + + vector<uint8_t> bytes = header.store(self.ref()->storage())->encode(); + + for (const auto & in : bcastAddresses) { + sockaddr_in sin = {}; + sin.sin_family = AF_INET; + sin.sin_addr = in; + sin.sin_port = htons(discoveryPort); + sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin)); + } + + lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval); + } + + announceCondvar.wait_until(lock, lastAnnounce + announceInterval); + } +} + +optional<TransportHeader> TransportHeader::load(const Ref & ref) +{ + auto rec = ref->asRecord(); + if (!rec) + return nullopt; + + vector<Item> items; + for (const auto & item : rec->items()) { + if (item.name == "ACK") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::Acknowledged, + .value = *ref, + }); + } else if (item.name == "REQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataRequest, + .value = *ref, + }); + } else if (item.name == "RSP") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataResponse, + .value = *ref, + }); + } else if (item.name == "ANN") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceSelf, + .value = *ref, + }); + } else if (item.name == "ANU") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceUpdate, + .value = *ref, + }); + } else if (item.name == "CRQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelRequest, + .value = *ref, + }); + } else if (item.name == "CAC") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelAccept, + .value = *ref, + }); + } else if (item.name == "STP") { + if (auto val = item.asText()) + items.emplace_back(Item { + .type = Type::ServiceType, + .value = *val, + }); + } else if (item.name == "SRF") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ServiceRef, + .value = *ref, + }); + } + } + + return TransportHeader { .items = items }; +} + +Ref TransportHeader::store(const Storage & st) const +{ + vector<Record::Item> ritems; + + for (const auto & item : items) { + switch (item.type) { + case Type::Acknowledged: + ritems.emplace_back("ACK", std::get<Ref>(item.value)); + break; + + case Type::DataRequest: + ritems.emplace_back("REQ", std::get<Ref>(item.value)); + break; + + case Type::DataResponse: + ritems.emplace_back("RSP", std::get<Ref>(item.value)); + break; + + case Type::AnnounceSelf: + ritems.emplace_back("ANN", std::get<Ref>(item.value)); + break; + + case Type::AnnounceUpdate: + ritems.emplace_back("ANU", std::get<Ref>(item.value)); + break; + + case Type::ChannelRequest: + ritems.emplace_back("CRQ", std::get<Ref>(item.value)); + break; + + case Type::ChannelAccept: + ritems.emplace_back("CAC", std::get<Ref>(item.value)); + break; + + case Type::ServiceType: + ritems.emplace_back("STP", std::get<string>(item.value)); + break; + + case Type::ServiceRef: + ritems.emplace_back("SRF", std::get<Ref>(item.value)); + break; + } + } + + return st.storeObject(Record(std::move(ritems))); +} diff --git a/src/network.h b/src/network.h new file mode 100644 index 0000000..bf01cfb --- /dev/null +++ b/src/network.h @@ -0,0 +1,73 @@ +#pragma once + +#include <erebos/network.h> + +#include <condition_variable> +#include <mutex> +#include <thread> +#include <vector> + +#include <netinet/in.h> + +using std::condition_variable; +using std::mutex; +using std::optional; +using std::string; +using std::thread; +using std::variant; +using std::vector; + +namespace chrono = std::chrono; +using chrono::steady_clock; + +namespace erebos { + +struct TransportHeader +{ + enum class Type { + Acknowledged, + DataRequest, + DataResponse, + AnnounceSelf, + AnnounceUpdate, + ChannelRequest, + ChannelAccept, + ServiceType, + ServiceRef, + }; + + struct Item { + const Type type; + const variant<Ref, string> value; + }; + + TransportHeader(const vector<Item> & items): items(items) {} + static optional<TransportHeader> load(const Ref &); + Ref store(const Storage & st) const; + + const vector<Item> items; +}; + +struct Server::Priv +{ + Priv(const Identity & self); + ~Priv(); + void doListen(); + void doAnnounce(); + + constexpr static uint16_t discoveryPort { 29665 }; + constexpr static chrono::seconds announceInterval { 60 }; + + mutex dataMutex; + condition_variable announceCondvar; + bool finish = false; + + Identity self; + thread threadListen; + thread threadAnnounce; + + int sock; + vector<in_addr> bcastAddresses; +}; + +} |