diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2020-01-01 21:27:11 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2020-01-04 21:29:00 +0100 |
commit | b97b503408911130d24d7f07f9247dca8314a316 (patch) | |
tree | 3b4202b42bbe51b2cd5b3720e755a804d7872159 /src | |
parent | ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 (diff) |
Respond to data requests from network
Diffstat (limited to 'src')
-rw-r--r-- | src/network.cpp | 112 | ||||
-rw-r--r-- | src/network.h | 17 | ||||
-rw-r--r-- | src/storage.cpp | 67 | ||||
-rw-r--r-- | src/storage.h | 4 |
4 files changed, 187 insertions, 13 deletions
diff --git a/src/network.cpp b/src/network.cpp index 40d1045..b1beb5c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,7 +10,6 @@ using std::scoped_lock; using std::unique_lock; -using std::unique_ptr; using namespace erebos; @@ -76,6 +75,29 @@ Server::Priv::~Priv() void Server::Priv::doListen() { + vector<uint8_t> buf(4096); + unique_lock<mutex> lock(dataMutex); + + while (!finish) { + sockaddr_in paddr; + + lock.unlock(); + socklen_t addrlen = sizeof(paddr); + ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0, + (sockaddr *) &paddr, &addrlen); + if (ret < 0) + throw std::system_error(errno, std::generic_category()); + + if (auto dec = Object::decodePrefix(self.ref()->storage(), + buf.begin(), buf.begin() + ret)) { + if (auto header = TransportHeader::load(std::get<Object>(*dec))) { + scoped_lock<mutex> hlock(dataMutex); + handlePacket(getPeer(paddr), *header); + } + } + + lock.lock(); + } } void Server::Priv::doAnnounce() @@ -108,9 +130,85 @@ void Server::Priv::doAnnounce() } } +Peer & Server::Priv::getPeer(const sockaddr_in & paddr) +{ + for (auto & peer : peers) + if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0) + return *peer; + + Peer * peer = new Peer { .sock = sock, .addr = paddr }; + peers.emplace_back(peer); + return *peer; +} + +void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) +{ + unordered_set<Digest> plaintextRefs; + for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) + plaintextRefs.insert(obj.ref.digest()); + + for (auto & item : header.items) { + switch (item.type) { + case TransportHeader::Type::Acknowledged: + break; + + case TransportHeader::Type::DataRequest: { + auto ref = std::get<Ref>(item.value); + if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) { + TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref }; + peer.send(TransportHeader({ hitem }), { *ref }); + } + break; + } + + case TransportHeader::Type::DataResponse: + break; + + case TransportHeader::Type::AnnounceSelf: + break; + + case TransportHeader::Type::AnnounceUpdate: + break; + + case TransportHeader::Type::ChannelRequest: + break; + + case TransportHeader::Type::ChannelAccept: + break; + + case TransportHeader::Type::ServiceType: + break; + + case TransportHeader::Type::ServiceRef: + break; + + } + } +} + +void Peer::send(const TransportHeader & header, const vector<Object> & objs) +{ + vector<uint8_t> data, part; + + part = header.toObject().encode(); + data.insert(data.end(), part.begin(), part.end()); + for (const auto & obj : objs) { + part = obj.encode(); + data.insert(data.end(), part.begin(), part.end()); + } + + sendto(sock, data.data(), data.size(), 0, + (sockaddr *) &addr, sizeof(addr)); +} + optional<TransportHeader> TransportHeader::load(const Ref & ref) { - auto rec = ref->asRecord(); + return load(*ref); +} + +optional<TransportHeader> TransportHeader::load(const Object & obj) +{ + auto rec = obj.asRecord(); if (!rec) return nullopt; @@ -176,7 +274,7 @@ optional<TransportHeader> TransportHeader::load(const Ref & ref) return TransportHeader { .items = items }; } -Ref TransportHeader::store(const Storage & st) const +Object TransportHeader::toObject() const { vector<Record::Item> ritems; @@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const } } - return st.storeObject(Record(std::move(ritems))); + return Object(Record(std::move(ritems))); +} + +Ref TransportHeader::store(const Storage & st) const +{ + + return st.storeObject(toObject()); } diff --git a/src/network.h b/src/network.h index bf01cfb..ad29496 100644 --- a/src/network.h +++ b/src/network.h @@ -14,6 +14,7 @@ using std::mutex; using std::optional; using std::string; using std::thread; +using std::unique_ptr; using std::variant; using std::vector; @@ -22,6 +23,14 @@ using chrono::steady_clock; namespace erebos { +struct Peer +{ + const int sock; + const sockaddr_in addr; + + void send(const struct TransportHeader &, const vector<Object> &); +}; + struct TransportHeader { enum class Type { @@ -43,6 +52,8 @@ struct TransportHeader TransportHeader(const vector<Item> & items): items(items) {} static optional<TransportHeader> load(const Ref &); + static optional<TransportHeader> load(const Object &); + Object toObject() const; Ref store(const Storage & st) const; const vector<Item> items; @@ -55,6 +66,9 @@ struct Server::Priv void doListen(); void doAnnounce(); + Peer & getPeer(const sockaddr_in & paddr); + void handlePacket(Peer &, const TransportHeader &); + constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; @@ -66,6 +80,9 @@ struct Server::Priv thread threadListen; thread threadAnnounce; + vector<unique_ptr<Peer>> peers; + vector<struct TransportHeader> outgoing; + int sock; vector<in_addr> bcastAddresses; }; diff --git a/src/storage.cpp b/src/storage.cpp index e611da0..0d9b52e 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -29,6 +29,7 @@ using std::runtime_error; using std::shared_ptr; using std::string; using std::to_string; +using std::tuple; optional<Storage> Storage::open(fs::path path) { @@ -492,29 +493,52 @@ Blob Blob::decode(Storage, return Blob(make_shared<vector<uint8_t>>(begin, end)); } - -optional<Object> Object::decode(Storage st, const vector<uint8_t> & data) +optional<tuple<Object, vector<uint8_t>::const_iterator>> +Object::decodePrefix(Storage st, + vector<uint8_t>::const_iterator begin, + vector<uint8_t>::const_iterator end) { - auto newline = std::find(data.begin(), data.end(), '\n'); - if (newline == data.end()) + auto newline = std::find(begin, end, '\n'); + if (newline == end) return nullopt; - auto space = std::find(data.begin(), newline, ' '); + auto space = std::find(begin, newline, ' '); if (space == newline) return nullopt; ssize_t size = std::stoi(string(space + 1, newline)); - if (data.end() - newline - 1 != size) + if (end - newline - 1 < size) return nullopt; + auto cend = newline + 1 + size; - string type(data.begin(), space); + string type(begin, space); + optional<Object> obj; if (type == "rec") - return Object(Record::decode(st, newline + 1, data.end())); + obj.emplace(Record::decode(st, newline + 1, cend)); else if (type == "blob") - return Object(Blob::decode(st, newline + 1, data.end())); + obj.emplace(Blob::decode(st, newline + 1, cend)); else throw runtime_error("unknown object type '" + type + "'"); + if (obj) + return std::make_tuple(*obj, cend); + return nullopt; +} + +optional<Object> Object::decode(Storage st, const vector<uint8_t> & data) +{ + return decode(st, data.begin(), data.end()); +} + +optional<Object> Object::decode(Storage st, + vector<uint8_t>::const_iterator begin, + vector<uint8_t>::const_iterator end) +{ + if (auto res = decodePrefix(st, begin, end)) { + auto [obj, next] = *res; + if (next == end) + return obj; + } return nullopt; } @@ -563,3 +587,28 @@ optional<Blob> Object::asBlob() const return std::get<Blob>(content); return nullopt; } + +vector<Stored<Object>> erebos::collectStoredObjects(const Stored<Object> & from) +{ + unordered_set<Digest> seen; + vector<Stored<Object>> queue { from }; + vector<Stored<Object>> res; + + while (!queue.empty()) { + auto cur = queue.back(); + queue.pop_back(); + + auto [it, added] = seen.insert(cur.ref.digest()); + if (!added) + continue; + + res.push_back(cur); + + if (auto rec = cur->asRecord()) + for (const auto & item : rec->items()) + if (auto ref = item.asRef()) + queue.push_back(*Stored<Object>::load(*ref)); + } + + return res; +} diff --git a/src/storage.h b/src/storage.h index e675848..68002fa 100644 --- a/src/storage.h +++ b/src/storage.h @@ -3,11 +3,13 @@ #include "erebos/storage.h" #include <future> +#include <unordered_set> namespace fs = std::filesystem; using std::optional; using std::shared_future; +using std::unordered_set; using std::vector; namespace erebos { @@ -32,4 +34,6 @@ struct Ref::Priv shared_future<Object> object; }; +vector<Stored<Object>> collectStoredObjects(const Stored<Object> &); + } |