diff options
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 83 |
1 files changed, 79 insertions, 4 deletions
diff --git a/src/network.cpp b/src/network.cpp index 64f9ed3..bd1ea8e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,12 +2,14 @@ #include "identity.h" +#include <algorithm> #include <cstring> #include <ifaddrs.h> #include <net/if.h> #include <unistd.h> +using std::holds_alternative; using std::scoped_lock; using std::unique_lock; @@ -88,12 +90,19 @@ void Server::Priv::doListen() if (ret < 0) throw std::system_error(errno, std::generic_category()); - auto peer = getPeer(paddr); + auto & peer = getPeer(paddr); if (auto dec = PartialObject::decodePrefix(peer.partStorage, buf.begin(), buf.begin() + ret)) { if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { + auto pos = std::get<1>(*dec); + while (auto cdec = PartialObject::decodePrefix(peer.partStorage, + pos, buf.begin() + ret)) { + peer.partStorage.storeObject(std::get<PartialObject>(*cdec)); + pos = std::get<1>(*cdec); + } scoped_lock<mutex> hlock(dataMutex); handlePacket(peer, *header); + peer.updateIdentity(); } } @@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr) Peer * peer = new Peer { .sock = sock, .addr = paddr, + .identity = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }; @@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + vector<TransportHeader::Item> replyHeaders; + vector<Object> replyBody; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - peer.send(TransportHeader({ hitem }), { **ref }); + replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref }); + replyBody.push_back(**ref); } } break; } case TransportHeader::Type::DataResponse: + if (auto pref = std::get<PartialRef>(item.value)) { + replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref }); + for (auto & pwref : waiting) { + if (auto wref = pwref.lock()) { + if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != + wref->missing.end()) { + if (wref->check(&replyHeaders)) + pwref.reset(); + } + } + } + waiting.erase(std::remove_if(waiting.begin(), waiting.end(), + [](auto & wref) { return wref.expired(); }), waiting.end()); + } break; - case TransportHeader::Type::AnnounceSelf: + case TransportHeader::Type::AnnounceSelf: { + auto pref = std::get<PartialRef>(item.value); + if (pref.digest() == self.ref()->digest()) + break; + + if (holds_alternative<monostate>(peer.identity)) + replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(&replyHeaders); break; + } case TransportHeader::Type::AnnounceUpdate: break; @@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) } } + + if (!replyHeaders.empty()) + peer.send(TransportHeader(replyHeaders), replyBody); } -void Peer::send(const TransportHeader & header, const vector<Object> & objs) +void Peer::send(const TransportHeader & header, const vector<Object> & objs) const { vector<uint8_t> data, part; @@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector<Object> & objs) (sockaddr *) &addr, sizeof(addr)); } +void Peer::updateIdentity() +{ + if (holds_alternative<shared_ptr<WaitingRef>>(identity)) + if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check()) + if (auto id = Identity::load(*ref)) + identity.emplace<Identity>(*id); +} + + +optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request) +{ + if (auto r = storage.ref(ref.digest())) + return *r; + + auto res = storage.copy(ref); + if (auto r = std::get_if<Ref>(&res)) + return *r; + + missing = std::get<vector<Digest>>(res); + if (request) + for (const auto & d : missing) + request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + + return nullopt; +} + + optional<TransportHeader> TransportHeader::load(const PartialRef & ref) { return load(*ref); |