diff options
-rw-r--r-- | src/network.cpp | 83 | ||||
-rw-r--r-- | src/network.h | 24 | ||||
-rw-r--r-- | src/storage.cpp | 34 | ||||
-rw-r--r-- | src/storage.h | 1 |
4 files changed, 126 insertions, 16 deletions
diff --git a/src/network.cpp b/src/network.cpp index 64f9ed3..bd1ea8e 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,12 +2,14 @@ #include "identity.h" +#include <algorithm> #include <cstring> #include <ifaddrs.h> #include <net/if.h> #include <unistd.h> +using std::holds_alternative; using std::scoped_lock; using std::unique_lock; @@ -88,12 +90,19 @@ void Server::Priv::doListen() if (ret < 0) throw std::system_error(errno, std::generic_category()); - auto peer = getPeer(paddr); + auto & peer = getPeer(paddr); if (auto dec = PartialObject::decodePrefix(peer.partStorage, buf.begin(), buf.begin() + ret)) { if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) { + auto pos = std::get<1>(*dec); + while (auto cdec = PartialObject::decodePrefix(peer.partStorage, + pos, buf.begin() + ret)) { + peer.partStorage.storeObject(std::get<PartialObject>(*cdec)); + pos = std::get<1>(*cdec); + } scoped_lock<mutex> hlock(dataMutex); handlePacket(peer, *header); + peer.updateIdentity(); } } @@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr) Peer * peer = new Peer { .sock = sock, .addr = paddr, + .identity = monostate(), .tempStorage = st, .partStorage = st.derivePartialStorage(), }; @@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) plaintextRefs.insert(obj.ref.digest()); + vector<TransportHeader::Item> replyHeaders; + vector<Object> replyBody; + for (auto & item : header.items) { switch (item.type) { case TransportHeader::Type::Acknowledged: @@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) { if (auto ref = peer.tempStorage.ref(pref.digest())) { TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref }; - peer.send(TransportHeader({ hitem }), { **ref }); + replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref }); + replyBody.push_back(**ref); } } break; } case TransportHeader::Type::DataResponse: + if (auto pref = std::get<PartialRef>(item.value)) { + replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref }); + for (auto & pwref : waiting) { + if (auto wref = pwref.lock()) { + if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) != + wref->missing.end()) { + if (wref->check(&replyHeaders)) + pwref.reset(); + } + } + } + waiting.erase(std::remove_if(waiting.begin(), waiting.end(), + [](auto & wref) { return wref.expired(); }), waiting.end()); + } break; - case TransportHeader::Type::AnnounceSelf: + case TransportHeader::Type::AnnounceSelf: { + auto pref = std::get<PartialRef>(item.value); + if (pref.digest() == self.ref()->digest()) + break; + + if (holds_alternative<monostate>(peer.identity)) + replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()}); + + shared_ptr<WaitingRef> wref(new WaitingRef { + .storage = peer.tempStorage, + .ref = pref, + .peer = peer, + .missing = {}, + }); + waiting.push_back(wref); + peer.identity = wref; + wref->check(&replyHeaders); break; + } case TransportHeader::Type::AnnounceUpdate: break; @@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) } } + + if (!replyHeaders.empty()) + peer.send(TransportHeader(replyHeaders), replyBody); } -void Peer::send(const TransportHeader & header, const vector<Object> & objs) +void Peer::send(const TransportHeader & header, const vector<Object> & objs) const { vector<uint8_t> data, part; @@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector<Object> & objs) (sockaddr *) &addr, sizeof(addr)); } +void Peer::updateIdentity() +{ + if (holds_alternative<shared_ptr<WaitingRef>>(identity)) + if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check()) + if (auto id = Identity::load(*ref)) + identity.emplace<Identity>(*id); +} + + +optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request) +{ + if (auto r = storage.ref(ref.digest())) + return *r; + + auto res = storage.copy(ref); + if (auto r = std::get_if<Ref>(&res)) + return *r; + + missing = std::get<vector<Digest>>(res); + if (request) + for (const auto & d : missing) + request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) }); + + return nullopt; +} + + optional<TransportHeader> TransportHeader::load(const PartialRef & ref) { return load(*ref); diff --git a/src/network.h b/src/network.h index af202f6..bb32323 100644 --- a/src/network.h +++ b/src/network.h @@ -10,13 +10,16 @@ #include <netinet/in.h> using std::condition_variable; +using std::monostate; using std::mutex; using std::optional; +using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::variant; using std::vector; +using std::weak_ptr; namespace chrono = std::chrono; using chrono::steady_clock; @@ -25,13 +28,21 @@ namespace erebos { struct Peer { + Peer(const Peer &) = delete; + Peer & operator=(const Peer &) = delete; + const int sock; const sockaddr_in addr; + variant<monostate, + shared_ptr<struct WaitingRef>, + Identity> identity; + Storage tempStorage; PartialStorage partStorage; - void send(const struct TransportHeader &, const vector<Object> &); + void send(const struct TransportHeader &, const vector<Object> &) const; + void updateIdentity(); }; struct TransportHeader @@ -61,6 +72,16 @@ struct TransportHeader const vector<Item> items; }; +struct WaitingRef +{ + const Storage storage; + const PartialRef ref; + const Peer & peer; + vector<Digest> missing; + + optional<Ref> check(vector<TransportHeader::Item> * request = nullptr); +}; + struct Server::Priv { Priv(const Identity & self); @@ -84,6 +105,7 @@ struct Server::Priv vector<unique_ptr<Peer>> peers; vector<struct TransportHeader> outgoing; + vector<weak_ptr<WaitingRef>> waiting; int sock; vector<in_addr> bcastAddresses; diff --git a/src/storage.cpp b/src/storage.cpp index 608f82b..6b2e4f8 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -316,6 +316,19 @@ optional<Ref> Storage::ref(const Digest & digest) const return Ref::create(*this, digest); } +Digest PartialStorage::Priv::storeBytes(const vector<uint8_t> & content) const +{ + array<uint8_t, Digest::size> arr; + int ret = blake2b(arr.data(), content.data(), nullptr, + Digest::size, content.size(), 0); + if (ret != 0) + throw runtime_error("failed to compute digest"); + + Digest digest(arr); + backend->storeBytes(digest, content); + return digest; +} + optional<vector<uint8_t>> PartialStorage::Priv::loadBytes(const Digest & digest) const { auto ocontent = backend->loadBytes(digest); @@ -339,6 +352,15 @@ optional<PartialObject> PartialStorage::loadObject(const Digest & digest) const return nullopt; } +PartialRef PartialStorage::storeObject(const PartialObject & obj) const +{ return ref(p->storeBytes(obj.encode())); } + +PartialRef PartialStorage::storeObject(const PartialRecord & val) const +{ return storeObject(PartialObject(val)); } + +PartialRef PartialStorage::storeObject(const Blob & val) const +{ return storeObject(PartialObject(val)); } + optional<Object> Storage::loadObject(const Digest & digest) const { if (auto content = p->loadBytes(digest)) @@ -380,17 +402,7 @@ optional<Digest> Storage::Priv::copy(const ObjectT<S> & pobj, vector<Digest> * m if (fail) return nullopt; - auto content = pobj.encode(); - - array<uint8_t, Digest::size> arr; - int ret = blake2b(arr.data(), content.data(), nullptr, - Digest::size, content.size(), 0); - if (ret != 0) - throw runtime_error("failed to compute digest"); - - Digest digest(arr); - backend->storeBytes(digest, content); - return digest; + return storeBytes(pobj.encode()); } variant<Ref, vector<Digest>> Storage::copy(const PartialRef & pref) const diff --git a/src/storage.h b/src/storage.h index 9e22a4a..86dc48f 100644 --- a/src/storage.h +++ b/src/storage.h @@ -102,6 +102,7 @@ struct Storage::Priv { shared_ptr<StorageBackend> backend; + Digest storeBytes(const vector<uint8_t> &) const; optional<vector<uint8_t>> loadBytes(const Digest & digest) const; template<class S> |