From ec402bfaa90cdb52276f5ccc2525e799cb4419d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 11 Jan 2020 16:13:40 +0100 Subject: Request missing data from network --- src/network.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 4 deletions(-) (limited to 'src/network.cpp') diff --git a/src/network.cpp b/src/network.cpp index 64f9ed3..bd1ea8e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,12 +2,14 @@ #include "identity.h" +#include #include #include #include #include +using std::holds_alternative; using std::scoped_lock; using std::unique_lock; @@ -88,12 +90,19 @@ void Server::Priv::doListen() if (ret < 0) throw std::system_error(errno, std::generic_category()); - auto peer = getPeer(paddr); + auto & peer = getPeer(paddr); if (auto dec = PartialObject::decodePrefix(peer.partStorage, buf.begin(), buf.begin() + ret)) { if (auto header = TransportHeader::load(std::get(*dec))) { + auto pos = std::get<1>(*dec); + while (auto cdec = PartialObject::decodePrefix(peer.partStorage, + pos, buf.begin() + ret)) { + peer.partStorage.storeObject(std::get(*cdec)); + pos = std::get<1>(*cdec); + } scoped_lock hlock(dataMutex); handlePacket(peer, *header); + peer.updateIdentity(); } } @@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr) Peer * peer = new Peer { .sock = sock, .addr = paddr, + .identity = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }; @@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) for (const auto & obj : collectStoredObjects(*Stored::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + vector replyHeaders; + vector replyBody; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - peer.send(TransportHeader({ hitem }), { **ref }); + replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref }); + replyBody.push_back(**ref); } } break; } case TransportHeader::Type::DataResponse: + if (auto pref = std::get(item.value)) { + replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref }); + for (auto & pwref : waiting) { + if (auto wref = pwref.lock()) { + if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != + wref->missing.end()) { + if (wref->check(&replyHeaders)) + pwref.reset(); + } + } + } + waiting.erase(std::remove_if(waiting.begin(), waiting.end(), + [](auto & wref) { return wref.expired(); }), waiting.end()); + } break; - case TransportHeader::Type::AnnounceSelf: + case TransportHeader::Type::AnnounceSelf: { + auto pref = std::get(item.value); + if (pref.digest() == self.ref()->digest()) + break; + + if (holds_alternative(peer.identity)) + replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + + shared_ptr wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(&replyHeaders); break; + } case TransportHeader::Type::AnnounceUpdate: break; @@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) } } + + if (!replyHeaders.empty()) + peer.send(TransportHeader(replyHeaders), replyBody); } -void Peer::send(const TransportHeader & header, const vector & objs) +void Peer::send(const TransportHeader & header, const vector & objs) const { vector data, part; @@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector & objs) (sockaddr *) &addr, sizeof(addr)); } +void Peer::updateIdentity() +{ + if (holds_alternative>(identity)) + if (auto ref = std::get>(identity)->check()) + if (auto id = Identity::load(*ref)) + identity.emplace(*id); +} + + +optional WaitingRef::check(vector * request) +{ + if (auto r = storage.ref(ref.digest())) + return *r; + + auto res = storage.copy(ref); + if (auto r = std::get_if(&res)) + return *r; + + missing = std::get>(res); + if (request) + for (const auto & d : missing) + request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + + return nullopt; +} + + optional TransportHeader::load(const PartialRef & ref) { return load(*ref); -- cgit v1.2.3