From b97b503408911130d24d7f07f9247dca8314a316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 1 Jan 2020 21:27:11 +0100 Subject: Respond to data requests from network --- include/erebos/storage.h | 22 ++++++++++ src/network.cpp | 112 +++++++++++++++++++++++++++++++++++++++++++++-- src/network.h | 17 +++++++ src/storage.cpp | 67 ++++++++++++++++++++++++---- src/storage.h | 4 ++ 5 files changed, 209 insertions(+), 13 deletions(-) diff --git a/include/erebos/storage.h b/include/erebos/storage.h index 95a4574..edb0aca 100644 --- a/include/erebos/storage.h +++ b/include/erebos/storage.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -183,7 +184,15 @@ public: Object(Variants content): content(content) {} Object & operator=(const Object &) = delete; + static std::optional::const_iterator>> + decodePrefix(Storage, std::vector::const_iterator, + std::vector::const_iterator); + static std::optional decode(Storage, const std::vector &); + static std::optional decode(Storage, + std::vector::const_iterator, + std::vector::const_iterator); + static std::vector decodeMany(Storage, const std::vector &); std::vector encode() const; static std::optional load(const Ref &); @@ -297,3 +306,16 @@ bool Stored::precedes(const Stored & other) const } } + +namespace std +{ + template<> struct hash + { + std::size_t operator()(const erebos::Digest & dgst) const noexcept + { + std::size_t res; + std::memcpy(&res, dgst.arr().data(), sizeof res); + return res; + } + }; +} diff --git a/src/network.cpp b/src/network.cpp index 40d1045..b1beb5c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,7 +10,6 @@ using std::scoped_lock; using std::unique_lock; -using std::unique_ptr; using namespace erebos; @@ -76,6 +75,29 @@ Server::Priv::~Priv() void Server::Priv::doListen() { + vector buf(4096); + unique_lock lock(dataMutex); + + while (!finish) { + sockaddr_in paddr; + + lock.unlock(); + socklen_t addrlen = sizeof(paddr); + ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0, + (sockaddr *) &paddr, &addrlen); + if (ret < 0) + throw std::system_error(errno, std::generic_category()); + + if (auto dec = Object::decodePrefix(self.ref()->storage(), + buf.begin(), buf.begin() + ret)) { + if (auto header = TransportHeader::load(std::get(*dec))) { + scoped_lock hlock(dataMutex); + handlePacket(getPeer(paddr), *header); + } + } + + lock.lock(); + } } void Server::Priv::doAnnounce() @@ -108,9 +130,85 @@ void Server::Priv::doAnnounce() } } +Peer & Server::Priv::getPeer(const sockaddr_in & paddr) +{ + for (auto & peer : peers) + if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0) + return *peer; + + Peer * peer = new Peer { .sock = sock, .addr = paddr }; + peers.emplace_back(peer); + return *peer; +} + +void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) +{ + unordered_set plaintextRefs; + for (const auto & obj : collectStoredObjects(*Stored::load(*self.ref()))) + plaintextRefs.insert(obj.ref.digest()); + + for (auto & item : header.items) { + switch (item.type) { + case TransportHeader::Type::Acknowledged: + break; + + case TransportHeader::Type::DataRequest: { + auto ref = std::get(item.value); + if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) { + TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref }; + peer.send(TransportHeader({ hitem }), { *ref }); + } + break; + } + + case TransportHeader::Type::DataResponse: + break; + + case TransportHeader::Type::AnnounceSelf: + break; + + case TransportHeader::Type::AnnounceUpdate: + break; + + case TransportHeader::Type::ChannelRequest: + break; + + case TransportHeader::Type::ChannelAccept: + break; + + case TransportHeader::Type::ServiceType: + break; + + case TransportHeader::Type::ServiceRef: + break; + + } + } +} + +void Peer::send(const TransportHeader & header, const vector & objs) +{ + vector data, part; + + part = header.toObject().encode(); + data.insert(data.end(), part.begin(), part.end()); + for (const auto & obj : objs) { + part = obj.encode(); + data.insert(data.end(), part.begin(), part.end()); + } + + sendto(sock, data.data(), data.size(), 0, + (sockaddr *) &addr, sizeof(addr)); +} + optional TransportHeader::load(const Ref & ref) { - auto rec = ref->asRecord(); + return load(*ref); +} + +optional TransportHeader::load(const Object & obj) +{ + auto rec = obj.asRecord(); if (!rec) return nullopt; @@ -176,7 +274,7 @@ optional TransportHeader::load(const Ref & ref) return TransportHeader { .items = items }; } -Ref TransportHeader::store(const Storage & st) const +Object TransportHeader::toObject() const { vector ritems; @@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const } } - return st.storeObject(Record(std::move(ritems))); + return Object(Record(std::move(ritems))); +} + +Ref TransportHeader::store(const Storage & st) const +{ + + return st.storeObject(toObject()); } diff --git a/src/network.h b/src/network.h index bf01cfb..ad29496 100644 --- a/src/network.h +++ b/src/network.h @@ -14,6 +14,7 @@ using std::mutex; using std::optional; using std::string; using std::thread; +using std::unique_ptr; using std::variant; using std::vector; @@ -22,6 +23,14 @@ using chrono::steady_clock; namespace erebos { +struct Peer +{ + const int sock; + const sockaddr_in addr; + + void send(const struct TransportHeader &, const vector &); +}; + struct TransportHeader { enum class Type { @@ -43,6 +52,8 @@ struct TransportHeader TransportHeader(const vector & items): items(items) {} static optional load(const Ref &); + static optional load(const Object &); + Object toObject() const; Ref store(const Storage & st) const; const vector items; @@ -55,6 +66,9 @@ struct Server::Priv void doListen(); void doAnnounce(); + Peer & getPeer(const sockaddr_in & paddr); + void handlePacket(Peer &, const TransportHeader &); + constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; @@ -66,6 +80,9 @@ struct Server::Priv thread threadListen; thread threadAnnounce; + vector> peers; + vector outgoing; + int sock; vector bcastAddresses; }; diff --git a/src/storage.cpp b/src/storage.cpp index e611da0..0d9b52e 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -29,6 +29,7 @@ using std::runtime_error; using std::shared_ptr; using std::string; using std::to_string; +using std::tuple; optional Storage::open(fs::path path) { @@ -492,29 +493,52 @@ Blob Blob::decode(Storage, return Blob(make_shared>(begin, end)); } - -optional Object::decode(Storage st, const vector & data) +optional::const_iterator>> +Object::decodePrefix(Storage st, + vector::const_iterator begin, + vector::const_iterator end) { - auto newline = std::find(data.begin(), data.end(), '\n'); - if (newline == data.end()) + auto newline = std::find(begin, end, '\n'); + if (newline == end) return nullopt; - auto space = std::find(data.begin(), newline, ' '); + auto space = std::find(begin, newline, ' '); if (space == newline) return nullopt; ssize_t size = std::stoi(string(space + 1, newline)); - if (data.end() - newline - 1 != size) + if (end - newline - 1 < size) return nullopt; + auto cend = newline + 1 + size; - string type(data.begin(), space); + string type(begin, space); + optional obj; if (type == "rec") - return Object(Record::decode(st, newline + 1, data.end())); + obj.emplace(Record::decode(st, newline + 1, cend)); else if (type == "blob") - return Object(Blob::decode(st, newline + 1, data.end())); + obj.emplace(Blob::decode(st, newline + 1, cend)); else throw runtime_error("unknown object type '" + type + "'"); + if (obj) + return std::make_tuple(*obj, cend); + return nullopt; +} + +optional Object::decode(Storage st, const vector & data) +{ + return decode(st, data.begin(), data.end()); +} + +optional Object::decode(Storage st, + vector::const_iterator begin, + vector::const_iterator end) +{ + if (auto res = decodePrefix(st, begin, end)) { + auto [obj, next] = *res; + if (next == end) + return obj; + } return nullopt; } @@ -563,3 +587,28 @@ optional Object::asBlob() const return std::get(content); return nullopt; } + +vector> erebos::collectStoredObjects(const Stored & from) +{ + unordered_set seen; + vector> queue { from }; + vector> res; + + while (!queue.empty()) { + auto cur = queue.back(); + queue.pop_back(); + + auto [it, added] = seen.insert(cur.ref.digest()); + if (!added) + continue; + + res.push_back(cur); + + if (auto rec = cur->asRecord()) + for (const auto & item : rec->items()) + if (auto ref = item.asRef()) + queue.push_back(*Stored::load(*ref)); + } + + return res; +} diff --git a/src/storage.h b/src/storage.h index e675848..68002fa 100644 --- a/src/storage.h +++ b/src/storage.h @@ -3,11 +3,13 @@ #include "erebos/storage.h" #include +#include namespace fs = std::filesystem; using std::optional; using std::shared_future; +using std::unordered_set; using std::vector; namespace erebos { @@ -32,4 +34,6 @@ struct Ref::Priv shared_future object; }; +vector> collectStoredObjects(const Stored &); + } -- cgit v1.2.3