From ec402bfaa90cdb52276f5ccc2525e799cb4419d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 11 Jan 2020 16:13:40 +0100 Subject: Request missing data from network --- src/network.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- src/network.h | 24 ++++++++++++++++- src/storage.cpp | 34 +++++++++++++++-------- src/storage.h | 1 + 4 files changed, 126 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/network.cpp b/src/network.cpp index 64f9ed3..bd1ea8e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,12 +2,14 @@ #include "identity.h" +#include #include #include #include #include +using std::holds_alternative; using std::scoped_lock; using std::unique_lock; @@ -88,12 +90,19 @@ void Server::Priv::doListen() if (ret < 0) throw std::system_error(errno, std::generic_category()); - auto peer = getPeer(paddr); + auto & peer = getPeer(paddr); if (auto dec = PartialObject::decodePrefix(peer.partStorage, buf.begin(), buf.begin() + ret)) { if (auto header = TransportHeader::load(std::get(*dec))) { + auto pos = std::get<1>(*dec); + while (auto cdec = PartialObject::decodePrefix(peer.partStorage, + pos, buf.begin() + ret)) { + peer.partStorage.storeObject(std::get(*cdec)); + pos = std::get<1>(*cdec); + } scoped_lock hlock(dataMutex); handlePacket(peer, *header); + peer.updateIdentity(); } } @@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr) Peer * peer = new Peer { .sock = sock, .addr = paddr, + .identity = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }; @@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) for (const auto & obj : collectStoredObjects(*Stored::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + vector replyHeaders; + vector replyBody; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - peer.send(TransportHeader({ hitem }), { **ref }); + replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref }); + replyBody.push_back(**ref); } } break; } case TransportHeader::Type::DataResponse: + if (auto pref = std::get(item.value)) { + replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref }); + for (auto & pwref : waiting) { + if (auto wref = pwref.lock()) { + if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != + wref->missing.end()) { + if (wref->check(&replyHeaders)) + pwref.reset(); + } + } + } + waiting.erase(std::remove_if(waiting.begin(), waiting.end(), + [](auto & wref) { return wref.expired(); }), waiting.end()); + } break; - case TransportHeader::Type::AnnounceSelf: + case TransportHeader::Type::AnnounceSelf: { + auto pref = std::get(item.value); + if (pref.digest() == self.ref()->digest()) + break; + + if (holds_alternative(peer.identity)) + replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(&replyHeaders); break; + } case TransportHeader::Type::AnnounceUpdate: break; @@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) } } + + if (!replyHeaders.empty()) + peer.send(TransportHeader(replyHeaders), replyBody); } -void Peer::send(const TransportHeader & header, const vector & objs) +void Peer::send(const TransportHeader & header, const vector & objs) const { vector data, part; @@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector & objs) (sockaddr *) &addr, sizeof(addr)); } +void Peer::updateIdentity() +{ + if (holds_alternative>(identity)) + if (auto ref = std::get>(identity)->check()) + if (auto id = Identity::load(*ref)) + identity.emplace(*id); +} + + +optional WaitingRef::check(vector * request) +{ + if (auto r = storage.ref(ref.digest())) + return *r; + + auto res = storage.copy(ref); + if (auto r = std::get_if(&res)) + return *r; + + missing = std::get>(res); + if (request) + for (const auto & d : missing) + request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + + return nullopt; +} + + optional TransportHeader::load(const PartialRef & ref) { return load(*ref); diff --git a/src/network.h b/src/network.h index af202f6..bb32323 100644 --- a/src/network.h +++ b/src/network.h @@ -10,13 +10,16 @@ #include using std::condition_variable; +using std::monostate; using std::mutex; using std::optional; +using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::variant; using std::vector; +using std::weak_ptr; namespace chrono = std::chrono; using chrono::steady_clock; @@ -25,13 +28,21 @@ namespace erebos { struct Peer { + Peer(const Peer &) = delete; + Peer & operator=(const Peer &) = delete; + const int sock; const sockaddr_in addr; + variant, + Identity> identity; + Storage tempStorage; PartialStorage partStorage; - void send(const struct TransportHeader &, const vector &); + void send(const struct TransportHeader &, const vector &) const; + void updateIdentity(); }; struct TransportHeader @@ -61,6 +72,16 @@ struct TransportHeader const vector items; }; +struct WaitingRef +{ + const Storage storage; + const PartialRef ref; + const Peer & peer; + vector missing; + + optional check(vector * request = nullptr); +}; + struct Server::Priv { Priv(const Identity & self); @@ -84,6 +105,7 @@ struct Server::Priv vector> peers; vector outgoing; + vector> waiting; int sock; vector bcastAddresses; diff --git a/src/storage.cpp b/src/storage.cpp index 608f82b..6b2e4f8 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -316,6 +316,19 @@ optional Storage::ref(const Digest & digest) const return Ref::create(*this, digest); } +Digest PartialStorage::Priv::storeBytes(const vector & content) const +{ + array arr; + int ret = blake2b(arr.data(), content.data(), nullptr, + Digest::size, content.size(), 0); + if (ret != 0) + throw runtime_error("failed to compute digest"); + + Digest digest(arr); + backend->storeBytes(digest, content); + return digest; +} + optional> PartialStorage::Priv::loadBytes(const Digest & digest) const { auto ocontent = backend->loadBytes(digest); @@ -339,6 +352,15 @@ optional PartialStorage::loadObject(const Digest & digest) const return nullopt; } +PartialRef PartialStorage::storeObject(const PartialObject & obj) const +{ return ref(p->storeBytes(obj.encode())); } + +PartialRef PartialStorage::storeObject(const PartialRecord & val) const +{ return storeObject(PartialObject(val)); } + +PartialRef PartialStorage::storeObject(const Blob & val) const +{ return storeObject(PartialObject(val)); } + optional Storage::loadObject(const Digest & digest) const { if (auto content = p->loadBytes(digest)) @@ -380,17 +402,7 @@ optional Storage::Priv::copy(const ObjectT & pobj, vector * m if (fail) return nullopt; - auto content = pobj.encode(); - - array arr; - int ret = blake2b(arr.data(), content.data(), nullptr, - Digest::size, content.size(), 0); - if (ret != 0) - throw runtime_error("failed to compute digest"); - - Digest digest(arr); - backend->storeBytes(digest, content); - return digest; + return storeBytes(pobj.encode()); } variant> Storage::copy(const PartialRef & pref) const diff --git a/src/storage.h b/src/storage.h index 9e22a4a..86dc48f 100644 --- a/src/storage.h +++ b/src/storage.h @@ -102,6 +102,7 @@ struct Storage::Priv { shared_ptr backend; + Digest storeBytes(const vector &) const; optional> loadBytes(const Digest & digest) const; template -- cgit v1.2.3