#include "network.h" #include "identity.h" #include #include #include #include using std::scoped_lock; using std::unique_lock; using std::unique_ptr; using namespace erebos; Server::Server(const Identity & self): p(new Priv(self)) { } Server::~Server() = default; Server::Priv::Priv(const Identity & self): self(self) { struct ifaddrs * raddrs; if (getifaddrs(&raddrs) < 0) throw std::system_error(errno, std::generic_category()); unique_ptr addrs(raddrs, freeifaddrs); for (struct ifaddrs * ifa = addrs.get(); ifa; ifa = ifa->ifa_next) { if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET && ifa->ifa_flags & IFF_BROADCAST) { bcastAddresses.push_back(((sockaddr_in*)ifa->ifa_broadaddr)->sin_addr); } } sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) throw std::system_error(errno, std::generic_category()); int enable = 1; if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, &enable, sizeof(enable)) < 0) throw std::system_error(errno, std::generic_category()); if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) throw std::system_error(errno, std::generic_category()); sockaddr_in laddr = {}; laddr.sin_family = AF_INET; laddr.sin_port = htons(discoveryPort); if (bind(sock, (sockaddr *) &laddr, sizeof(laddr)) < 0) throw std::system_error(errno, std::generic_category()); threadListen = thread([this] { doListen(); }); threadAnnounce = thread([this] { doAnnounce(); }); } Server::Priv::~Priv() { { scoped_lock lock(dataMutex); finish = true; } announceCondvar.notify_all(); threadListen.join(); threadAnnounce.join(); if (sock >= 0) close(sock); } void Server::Priv::doListen() { } void Server::Priv::doAnnounce() { unique_lock lock(dataMutex); auto lastAnnounce = steady_clock::now() - announceInterval; while (!finish) { auto now = steady_clock::now(); if (lastAnnounce + announceInterval < now) { TransportHeader header({ { TransportHeader::Type::AnnounceSelf, *self.ref() } }); vector bytes = header.store(self.ref()->storage())->encode(); for (const auto & in : bcastAddresses) { sockaddr_in sin = {}; sin.sin_family = AF_INET; sin.sin_addr = in; sin.sin_port = htons(discoveryPort); sendto(sock, bytes.data(), bytes.size(), 0, (sockaddr *) &sin, sizeof(sin)); } lastAnnounce += announceInterval * ((now - lastAnnounce) / announceInterval); } announceCondvar.wait_until(lock, lastAnnounce + announceInterval); } } optional TransportHeader::load(const Ref & ref) { auto rec = ref->asRecord(); if (!rec) return nullopt; vector items; for (const auto & item : rec->items()) { if (item.name == "ACK") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::Acknowledged, .value = *ref, }); } else if (item.name == "REQ") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::DataRequest, .value = *ref, }); } else if (item.name == "RSP") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::DataResponse, .value = *ref, }); } else if (item.name == "ANN") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::AnnounceSelf, .value = *ref, }); } else if (item.name == "ANU") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::AnnounceUpdate, .value = *ref, }); } else if (item.name == "CRQ") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ChannelRequest, .value = *ref, }); } else if (item.name == "CAC") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ChannelAccept, .value = *ref, }); } else if (item.name == "STP") { if (auto val = item.asText()) items.emplace_back(Item { .type = Type::ServiceType, .value = *val, }); } else if (item.name == "SRF") { if (auto ref = item.asRef()) items.emplace_back(Item { .type = Type::ServiceRef, .value = *ref, }); } } return TransportHeader { .items = items }; } Ref TransportHeader::store(const Storage & st) const { vector ritems; for (const auto & item : items) { switch (item.type) { case Type::Acknowledged: ritems.emplace_back("ACK", std::get(item.value)); break; case Type::DataRequest: ritems.emplace_back("REQ", std::get(item.value)); break; case Type::DataResponse: ritems.emplace_back("RSP", std::get(item.value)); break; case Type::AnnounceSelf: ritems.emplace_back("ANN", std::get(item.value)); break; case Type::AnnounceUpdate: ritems.emplace_back("ANU", std::get(item.value)); break; case Type::ChannelRequest: ritems.emplace_back("CRQ", std::get(item.value)); break; case Type::ChannelAccept: ritems.emplace_back("CAC", std::get(item.value)); break; case Type::ServiceType: ritems.emplace_back("STP", std::get(item.value)); break; case Type::ServiceRef: ritems.emplace_back("SRF", std::get(item.value)); break; } } return st.storeObject(Record(std::move(ritems))); }