diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2019-12-27 17:20:04 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2019-12-27 22:59:53 +0100 |
commit | ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 (patch) | |
tree | f4a09848ef09bfd816b8aefd6dc3b4e5a5440905 /src/network.cpp | |
parent | a02ef25970cb97ab4c29b3859799062431ae668b (diff) |
Network server sending announcements
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/src/network.cpp b/src/network.cpp new file mode 100644 index 0000000..40d1045 --- /dev/null +++ b/src/network.cpp @@ -0,0 +1,224 @@ +#include "network.h" + +#include "identity.h" + +#include <cstring> + +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> + +using std::scoped_lock; +using std::unique_lock; +using std::unique_ptr; + +using namespace erebos; + +Server::Server(const Identity & self): + p(new Priv(self)) +{ +} + +Server::~Server() = default; + +Server::Priv::Priv(const Identity & self): + self(self) +{ + struct ifaddrs * raddrs; + if (getifaddrs(&raddrs) < 0) + throw std::system_error(errno, std::generic_category()); + unique_ptr<ifaddrs, void(*)(ifaddrs *)> addrs(raddrs, freeifaddrs); + + for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET && + ifa->ifa_flags & IFF_BROADCAST) { + bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr); + } + } + + sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) + throw std::system_error(errno, std::generic_category()); + + int enable = 1; + if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + &enable, sizeof(enable)) < 0) + throw std::system_error(errno, std::generic_category()); + + sockaddr_in laddr = {}; + laddr.sin_family = AF_INET; + laddr.sin_port = htons(discoveryPort); + if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0) + throw std::system_error(errno, std::generic_category()); + + threadListen = thread([this] { doListen(); }); + threadAnnounce = thread([this] { doAnnounce(); }); +} + +Server::Priv::~Priv() +{ + { + scoped_lock lock(dataMutex); + finish = true; + } + + announceCondvar.notify_all(); + threadListen.join(); + threadAnnounce.join(); + + if (sock >= 0) + close(sock); +} + +void Server::Priv::doListen() +{ +} + +void Server::Priv::doAnnounce() +{ + unique_lock<mutex> lock(dataMutex); + auto lastAnnounce = steady_clock::now() - announceInterval; + + while (!finish) { + auto now = steady_clock::now(); + + if (lastAnnounce + announceInterval < now) { + TransportHeader header({ + { TransportHeader::Type::AnnounceSelf, *self.ref() } + }); + + vector<uint8_t> bytes = header.store(self.ref()->storage())->encode(); + + for (const auto & in : bcastAddresses) { + sockaddr_in sin = {}; + sin.sin_family = AF_INET; + sin.sin_addr = in; + sin.sin_port = htons(discoveryPort); + sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin)); + } + + lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval); + } + + announceCondvar.wait_until(lock, lastAnnounce + announceInterval); + } +} + +optional<TransportHeader> TransportHeader::load(const Ref & ref) +{ + auto rec = ref->asRecord(); + if (!rec) + return nullopt; + + vector<Item> items; + for (const auto & item : rec->items()) { + if (item.name == "ACK") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::Acknowledged, + .value = *ref, + }); + } else if (item.name == "REQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataRequest, + .value = *ref, + }); + } else if (item.name == "RSP") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::DataResponse, + .value = *ref, + }); + } else if (item.name == "ANN") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceSelf, + .value = *ref, + }); + } else if (item.name == "ANU") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::AnnounceUpdate, + .value = *ref, + }); + } else if (item.name == "CRQ") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelRequest, + .value = *ref, + }); + } else if (item.name == "CAC") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ChannelAccept, + .value = *ref, + }); + } else if (item.name == "STP") { + if (auto val = item.asText()) + items.emplace_back(Item { + .type = Type::ServiceType, + .value = *val, + }); + } else if (item.name == "SRF") { + if (auto ref = item.asRef()) + items.emplace_back(Item { + .type = Type::ServiceRef, + .value = *ref, + }); + } + } + + return TransportHeader { .items = items }; +} + +Ref TransportHeader::store(const Storage & st) const +{ + vector<Record::Item> ritems; + + for (const auto & item : items) { + switch (item.type) { + case Type::Acknowledged: + ritems.emplace_back("ACK", std::get<Ref>(item.value)); + break; + + case Type::DataRequest: + ritems.emplace_back("REQ", std::get<Ref>(item.value)); + break; + + case Type::DataResponse: + ritems.emplace_back("RSP", std::get<Ref>(item.value)); + break; + + case Type::AnnounceSelf: + ritems.emplace_back("ANN", std::get<Ref>(item.value)); + break; + + case Type::AnnounceUpdate: + ritems.emplace_back("ANU", std::get<Ref>(item.value)); + break; + + case Type::ChannelRequest: + ritems.emplace_back("CRQ", std::get<Ref>(item.value)); + break; + + case Type::ChannelAccept: + ritems.emplace_back("CAC", std::get<Ref>(item.value)); + break; + + case Type::ServiceType: + ritems.emplace_back("STP", std::get<string>(item.value)); + break; + + case Type::ServiceRef: + ritems.emplace_back("SRF", std::get<Ref>(item.value)); + break; + } + } + + return st.storeObject(Record(std::move(ritems))); +} |