summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/network.cpp83
-rw-r--r--src/network.h24
-rw-r--r--src/storage.cpp34
-rw-r--r--src/storage.h1
4 files changed, 126 insertions, 16 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 64f9ed3..bd1ea8e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -2,12 +2,14 @@
#include "identity.h"
+#include <algorithm>
#include <cstring>
#include <ifaddrs.h>
#include <net/if.h>
#include <unistd.h>
+using std::holds_alternative;
using std::scoped_lock;
using std::unique_lock;
@@ -88,12 +90,19 @@ void Server::Priv::doListen()
if (ret < 0)
throw std::system_error(errno, std::generic_category());
- auto peer = getPeer(paddr);
+ auto & peer = getPeer(paddr);
if (auto dec = PartialObject::decodePrefix(peer.partStorage,
buf.begin(), buf.begin() + ret)) {
if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) {
+ auto pos = std::get<1>(*dec);
+ while (auto cdec = PartialObject::decodePrefix(peer.partStorage,
+ pos, buf.begin() + ret)) {
+ peer.partStorage.storeObject(std::get<PartialObject>(*cdec));
+ pos = std::get<1>(*cdec);
+ }
scoped_lock<mutex> hlock(dataMutex);
handlePacket(peer, *header);
+ peer.updateIdentity();
}
}
@@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
Peer * peer = new Peer {
.sock = sock,
.addr = paddr,
+ .identity = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
};
@@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
plaintextRefs.insert(obj.ref.digest());
+ vector<TransportHeader::Item> replyHeaders;
+ vector<Object> replyBody;
+
for (auto & item : header.items) {
switch (item.type) {
case TransportHeader::Type::Acknowledged:
@@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- peer.send(TransportHeader({ hitem }), { **ref });
+ replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref });
+ replyBody.push_back(**ref);
}
}
break;
}
case TransportHeader::Type::DataResponse:
+ if (auto pref = std::get<PartialRef>(item.value)) {
+ replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref });
+ for (auto & pwref : waiting) {
+ if (auto wref = pwref.lock()) {
+ if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
+ wref->missing.end()) {
+ if (wref->check(&replyHeaders))
+ pwref.reset();
+ }
+ }
+ }
+ waiting.erase(std::remove_if(waiting.begin(), waiting.end(),
+ [](auto & wref) { return wref.expired(); }), waiting.end());
+ }
break;
- case TransportHeader::Type::AnnounceSelf:
+ case TransportHeader::Type::AnnounceSelf: {
+ auto pref = std::get<PartialRef>(item.value);
+ if (pref.digest() == self.ref()->digest())
+ break;
+
+ if (holds_alternative<monostate>(peer.identity))
+ replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.identity = wref;
+ wref->check(&replyHeaders);
break;
+ }
case TransportHeader::Type::AnnounceUpdate:
break;
@@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
}
}
+
+ if (!replyHeaders.empty())
+ peer.send(TransportHeader(replyHeaders), replyBody);
}
-void Peer::send(const TransportHeader & header, const vector<Object> & objs)
+void Peer::send(const TransportHeader & header, const vector<Object> & objs) const
{
vector<uint8_t> data, part;
@@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector<Object> & objs)
(sockaddr *) &addr, sizeof(addr));
}
+void Peer::updateIdentity()
+{
+ if (holds_alternative<shared_ptr<WaitingRef>>(identity))
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check())
+ if (auto id = Identity::load(*ref))
+ identity.emplace<Identity>(*id);
+}
+
+
+optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
+{
+ if (auto r = storage.ref(ref.digest()))
+ return *r;
+
+ auto res = storage.copy(ref);
+ if (auto r = std::get_if<Ref>(&res))
+ return *r;
+
+ missing = std::get<vector<Digest>>(res);
+ if (request)
+ for (const auto & d : missing)
+ request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+
+ return nullopt;
+}
+
+
optional<TransportHeader> TransportHeader::load(const PartialRef & ref)
{
return load(*ref);
diff --git a/src/network.h b/src/network.h
index af202f6..bb32323 100644
--- a/src/network.h
+++ b/src/network.h
@@ -10,13 +10,16 @@
#include <netinet/in.h>
using std::condition_variable;
+using std::monostate;
using std::mutex;
using std::optional;
+using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::variant;
using std::vector;
+using std::weak_ptr;
namespace chrono = std::chrono;
using chrono::steady_clock;
@@ -25,13 +28,21 @@ namespace erebos {
struct Peer
{
+ Peer(const Peer &) = delete;
+ Peer & operator=(const Peer &) = delete;
+
const int sock;
const sockaddr_in addr;
+ variant<monostate,
+ shared_ptr<struct WaitingRef>,
+ Identity> identity;
+
Storage tempStorage;
PartialStorage partStorage;
- void send(const struct TransportHeader &, const vector<Object> &);
+ void send(const struct TransportHeader &, const vector<Object> &) const;
+ void updateIdentity();
};
struct TransportHeader
@@ -61,6 +72,16 @@ struct TransportHeader
const vector<Item> items;
};
+struct WaitingRef
+{
+ const Storage storage;
+ const PartialRef ref;
+ const Peer & peer;
+ vector<Digest> missing;
+
+ optional<Ref> check(vector<TransportHeader::Item> * request = nullptr);
+};
+
struct Server::Priv
{
Priv(const Identity & self);
@@ -84,6 +105,7 @@ struct Server::Priv
vector<unique_ptr<Peer>> peers;
vector<struct TransportHeader> outgoing;
+ vector<weak_ptr<WaitingRef>> waiting;
int sock;
vector<in_addr> bcastAddresses;
diff --git a/src/storage.cpp b/src/storage.cpp
index 608f82b..6b2e4f8 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -316,6 +316,19 @@ optional<Ref> Storage::ref(const Digest & digest) const
return Ref::create(*this, digest);
}
+Digest PartialStorage::Priv::storeBytes(const vector<uint8_t> & content) const
+{
+ array<uint8_t, Digest::size> arr;
+ int ret = blake2b(arr.data(), content.data(), nullptr,
+ Digest::size, content.size(), 0);
+ if (ret != 0)
+ throw runtime_error("failed to compute digest");
+
+ Digest digest(arr);
+ backend->storeBytes(digest, content);
+ return digest;
+}
+
optional<vector<uint8_t>> PartialStorage::Priv::loadBytes(const Digest & digest) const
{
auto ocontent = backend->loadBytes(digest);
@@ -339,6 +352,15 @@ optional<PartialObject> PartialStorage::loadObject(const Digest & digest) const
return nullopt;
}
+PartialRef PartialStorage::storeObject(const PartialObject & obj) const
+{ return ref(p->storeBytes(obj.encode())); }
+
+PartialRef PartialStorage::storeObject(const PartialRecord & val) const
+{ return storeObject(PartialObject(val)); }
+
+PartialRef PartialStorage::storeObject(const Blob & val) const
+{ return storeObject(PartialObject(val)); }
+
optional<Object> Storage::loadObject(const Digest & digest) const
{
if (auto content = p->loadBytes(digest))
@@ -380,17 +402,7 @@ optional<Digest> Storage::Priv::copy(const ObjectT<S> & pobj, vector<Digest> * m
if (fail)
return nullopt;
- auto content = pobj.encode();
-
- array<uint8_t, Digest::size> arr;
- int ret = blake2b(arr.data(), content.data(), nullptr,
- Digest::size, content.size(), 0);
- if (ret != 0)
- throw runtime_error("failed to compute digest");
-
- Digest digest(arr);
- backend->storeBytes(digest, content);
- return digest;
+ return storeBytes(pobj.encode());
}
variant<Ref, vector<Digest>> Storage::copy(const PartialRef & pref) const
diff --git a/src/storage.h b/src/storage.h
index 9e22a4a..86dc48f 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -102,6 +102,7 @@ struct Storage::Priv
{
shared_ptr<StorageBackend> backend;
+ Digest storeBytes(const vector<uint8_t> &) const;
optional<vector<uint8_t>> loadBytes(const Digest & digest) const;
template<class S>