summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/erebos/storage.h22
-rw-r--r--src/network.cpp112
-rw-r--r--src/network.h17
-rw-r--r--src/storage.cpp67
-rw-r--r--src/storage.h4
5 files changed, 209 insertions, 13 deletions
diff --git a/include/erebos/storage.h b/include/erebos/storage.h
index 95a4574..edb0aca 100644
--- a/include/erebos/storage.h
+++ b/include/erebos/storage.h
@@ -1,6 +1,7 @@
#pragma once
#include <array>
+#include <cstring>
#include <filesystem>
#include <memory>
#include <optional>
@@ -183,7 +184,15 @@ public:
Object(Variants content): content(content) {}
Object & operator=(const Object &) = delete;
+ static std::optional<std::tuple<Object, std::vector<uint8_t>::const_iterator>>
+ decodePrefix(Storage, std::vector<uint8_t>::const_iterator,
+ std::vector<uint8_t>::const_iterator);
+
static std::optional<Object> decode(Storage, const std::vector<uint8_t> &);
+ static std::optional<Object> decode(Storage,
+ std::vector<uint8_t>::const_iterator,
+ std::vector<uint8_t>::const_iterator);
+ static std::vector<Object> decodeMany(Storage, const std::vector<uint8_t> &);
std::vector<uint8_t> encode() const;
static std::optional<Object> load(const Ref &);
@@ -297,3 +306,16 @@ bool Stored<T>::precedes(const Stored<T> & other) const
}
}
+
+namespace std
+{
+ template<> struct hash<erebos::Digest>
+ {
+ std::size_t operator()(const erebos::Digest & dgst) const noexcept
+ {
+ std::size_t res;
+ std::memcpy(&res, dgst.arr().data(), sizeof res);
+ return res;
+ }
+ };
+}
diff --git a/src/network.cpp b/src/network.cpp
index 40d1045..b1beb5c 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -10,7 +10,6 @@
using std::scoped_lock;
using std::unique_lock;
-using std::unique_ptr;
using namespace erebos;
@@ -76,6 +75,29 @@ Server::Priv::~Priv()
void Server::Priv::doListen()
{
+ vector<uint8_t> buf(4096);
+ unique_lock<mutex> lock(dataMutex);
+
+ while (!finish) {
+ sockaddr_in paddr;
+
+ lock.unlock();
+ socklen_t addrlen = sizeof(paddr);
+ ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0,
+ (sockaddr *) &paddr, &addrlen);
+ if (ret < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ if (auto dec = Object::decodePrefix(self.ref()->storage(),
+ buf.begin(), buf.begin() + ret)) {
+ if (auto header = TransportHeader::load(std::get<Object>(*dec))) {
+ scoped_lock<mutex> hlock(dataMutex);
+ handlePacket(getPeer(paddr), *header);
+ }
+ }
+
+ lock.lock();
+ }
}
void Server::Priv::doAnnounce()
@@ -108,9 +130,85 @@ void Server::Priv::doAnnounce()
}
}
+Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
+{
+ for (auto & peer : peers)
+ if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0)
+ return *peer;
+
+ Peer * peer = new Peer { .sock = sock, .addr = paddr };
+ peers.emplace_back(peer);
+ return *peer;
+}
+
+void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
+{
+ unordered_set<Digest> plaintextRefs;
+ for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
+ plaintextRefs.insert(obj.ref.digest());
+
+ for (auto & item : header.items) {
+ switch (item.type) {
+ case TransportHeader::Type::Acknowledged:
+ break;
+
+ case TransportHeader::Type::DataRequest: {
+ auto ref = std::get<Ref>(item.value);
+ if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) {
+ TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref };
+ peer.send(TransportHeader({ hitem }), { *ref });
+ }
+ break;
+ }
+
+ case TransportHeader::Type::DataResponse:
+ break;
+
+ case TransportHeader::Type::AnnounceSelf:
+ break;
+
+ case TransportHeader::Type::AnnounceUpdate:
+ break;
+
+ case TransportHeader::Type::ChannelRequest:
+ break;
+
+ case TransportHeader::Type::ChannelAccept:
+ break;
+
+ case TransportHeader::Type::ServiceType:
+ break;
+
+ case TransportHeader::Type::ServiceRef:
+ break;
+
+ }
+ }
+}
+
+void Peer::send(const TransportHeader & header, const vector<Object> & objs)
+{
+ vector<uint8_t> data, part;
+
+ part = header.toObject().encode();
+ data.insert(data.end(), part.begin(), part.end());
+ for (const auto & obj : objs) {
+ part = obj.encode();
+ data.insert(data.end(), part.begin(), part.end());
+ }
+
+ sendto(sock, data.data(), data.size(), 0,
+ (sockaddr *) &addr, sizeof(addr));
+}
+
optional<TransportHeader> TransportHeader::load(const Ref & ref)
{
- auto rec = ref->asRecord();
+ return load(*ref);
+}
+
+optional<TransportHeader> TransportHeader::load(const Object & obj)
+{
+ auto rec = obj.asRecord();
if (!rec)
return nullopt;
@@ -176,7 +274,7 @@ optional<TransportHeader> TransportHeader::load(const Ref & ref)
return TransportHeader { .items = items };
}
-Ref TransportHeader::store(const Storage & st) const
+Object TransportHeader::toObject() const
{
vector<Record::Item> ritems;
@@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const
}
}
- return st.storeObject(Record(std::move(ritems)));
+ return Object(Record(std::move(ritems)));
+}
+
+Ref TransportHeader::store(const Storage & st) const
+{
+
+ return st.storeObject(toObject());
}
diff --git a/src/network.h b/src/network.h
index bf01cfb..ad29496 100644
--- a/src/network.h
+++ b/src/network.h
@@ -14,6 +14,7 @@ using std::mutex;
using std::optional;
using std::string;
using std::thread;
+using std::unique_ptr;
using std::variant;
using std::vector;
@@ -22,6 +23,14 @@ using chrono::steady_clock;
namespace erebos {
+struct Peer
+{
+ const int sock;
+ const sockaddr_in addr;
+
+ void send(const struct TransportHeader &, const vector<Object> &);
+};
+
struct TransportHeader
{
enum class Type {
@@ -43,6 +52,8 @@ struct TransportHeader
TransportHeader(const vector<Item> & items): items(items) {}
static optional<TransportHeader> load(const Ref &);
+ static optional<TransportHeader> load(const Object &);
+ Object toObject() const;
Ref store(const Storage & st) const;
const vector<Item> items;
@@ -55,6 +66,9 @@ struct Server::Priv
void doListen();
void doAnnounce();
+ Peer & getPeer(const sockaddr_in & paddr);
+ void handlePacket(Peer &, const TransportHeader &);
+
constexpr static uint16_t discoveryPort { 29665 };
constexpr static chrono::seconds announceInterval { 60 };
@@ -66,6 +80,9 @@ struct Server::Priv
thread threadListen;
thread threadAnnounce;
+ vector<unique_ptr<Peer>> peers;
+ vector<struct TransportHeader> outgoing;
+
int sock;
vector<in_addr> bcastAddresses;
};
diff --git a/src/storage.cpp b/src/storage.cpp
index e611da0..0d9b52e 100644
--- a/src/storage.cpp
+++ b/src/storage.cpp
@@ -29,6 +29,7 @@ using std::runtime_error;
using std::shared_ptr;
using std::string;
using std::to_string;
+using std::tuple;
optional<Storage> Storage::open(fs::path path)
{
@@ -492,29 +493,52 @@ Blob Blob::decode(Storage,
return Blob(make_shared<vector<uint8_t>>(begin, end));
}
-
-optional<Object> Object::decode(Storage st, const vector<uint8_t> & data)
+optional<tuple<Object, vector<uint8_t>::const_iterator>>
+Object::decodePrefix(Storage st,
+ vector<uint8_t>::const_iterator begin,
+ vector<uint8_t>::const_iterator end)
{
- auto newline = std::find(data.begin(), data.end(), '\n');
- if (newline == data.end())
+ auto newline = std::find(begin, end, '\n');
+ if (newline == end)
return nullopt;
- auto space = std::find(data.begin(), newline, ' ');
+ auto space = std::find(begin, newline, ' ');
if (space == newline)
return nullopt;
ssize_t size = std::stoi(string(space + 1, newline));
- if (data.end() - newline - 1 != size)
+ if (end - newline - 1 < size)
return nullopt;
+ auto cend = newline + 1 + size;
- string type(data.begin(), space);
+ string type(begin, space);
+ optional<Object> obj;
if (type == "rec")
- return Object(Record::decode(st, newline + 1, data.end()));
+ obj.emplace(Record::decode(st, newline + 1, cend));
else if (type == "blob")
- return Object(Blob::decode(st, newline + 1, data.end()));
+ obj.emplace(Blob::decode(st, newline + 1, cend));
else
throw runtime_error("unknown object type '" + type + "'");
+ if (obj)
+ return std::make_tuple(*obj, cend);
+ return nullopt;
+}
+
+optional<Object> Object::decode(Storage st, const vector<uint8_t> & data)
+{
+ return decode(st, data.begin(), data.end());
+}
+
+optional<Object> Object::decode(Storage st,
+ vector<uint8_t>::const_iterator begin,
+ vector<uint8_t>::const_iterator end)
+{
+ if (auto res = decodePrefix(st, begin, end)) {
+ auto [obj, next] = *res;
+ if (next == end)
+ return obj;
+ }
return nullopt;
}
@@ -563,3 +587,28 @@ optional<Blob> Object::asBlob() const
return std::get<Blob>(content);
return nullopt;
}
+
+vector<Stored<Object>> erebos::collectStoredObjects(const Stored<Object> & from)
+{
+ unordered_set<Digest> seen;
+ vector<Stored<Object>> queue { from };
+ vector<Stored<Object>> res;
+
+ while (!queue.empty()) {
+ auto cur = queue.back();
+ queue.pop_back();
+
+ auto [it, added] = seen.insert(cur.ref.digest());
+ if (!added)
+ continue;
+
+ res.push_back(cur);
+
+ if (auto rec = cur->asRecord())
+ for (const auto & item : rec->items())
+ if (auto ref = item.asRef())
+ queue.push_back(*Stored<Object>::load(*ref));
+ }
+
+ return res;
+}
diff --git a/src/storage.h b/src/storage.h
index e675848..68002fa 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -3,11 +3,13 @@
#include "erebos/storage.h"
#include <future>
+#include <unordered_set>
namespace fs = std::filesystem;
using std::optional;
using std::shared_future;
+using std::unordered_set;
using std::vector;
namespace erebos {
@@ -32,4 +34,6 @@ struct Ref::Priv
shared_future<Object> object;
};
+vector<Stored<Object>> collectStoredObjects(const Stored<Object> &);
+
}