summaryrefslogtreecommitdiff
path: root/src/storage.cpp
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2019-10-26 22:19:56 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2019-12-10 21:29:28 +0100
commit361891f25ca735fd85db64a14823cc55b8a0619a (patch)
treed45734fbc123d8dcb26128b314eb4453ecdd154e /src/storage.cpp
parent1a1b36dea942cd7b18067f3f1220c9ab4f9b4448 (diff)
Basic object encoding and storage
Diffstat (limited to 'src/storage.cpp')
-rw-r--r--src/storage.cpp501
1 files changed, 501 insertions, 0 deletions
diff --git a/src/storage.cpp b/src/storage.cpp
new file mode 100644
index 0000000..2e7feb7
--- /dev/null
+++ b/src/storage.cpp
@@ -0,0 +1,501 @@
+#include "storage.h"
+#include "base64.h"
+
+#include <algorithm>
+#include <charconv>
+#include <chrono>
+#include <fstream>
+#include <iomanip>
+#include <iterator>
+#include <stdexcept>
+#include <thread>
+
+#include <stdio.h>
+
+#include <blake2.h>
+#include <zlib.h>
+
+using namespace erebos;
+
+using std::array;
+using std::copy;
+using std::holds_alternative;
+using std::ifstream;
+using std::make_shared;
+using std::nullopt;
+using std::runtime_error;
+using std::shared_ptr;
+using std::string;
+using std::to_string;
+
+optional<Storage> Storage::open(fs::path path)
+{
+ if (!fs::is_directory(path))
+ fs::create_directory(path);
+
+ if (!fs::is_directory(path/"objects"))
+ fs::create_directory(path/"objects");
+
+ if (!fs::is_directory(path/"heads"))
+ fs::create_directory(path/"heads");
+
+ return Storage(shared_ptr<const Priv>(new Priv { path }));
+}
+
+fs::path Storage::Priv::objectPath(const Digest & digest) const
+{
+ string name(digest);
+ return root/"objects"/
+ fs::path(name.begin(), name.begin() + 2)/
+ fs::path(name.begin() + 2, name.end());
+}
+
+optional<Ref> Storage::ref(const Digest & digest) const
+{
+ return Ref::create(*this, digest);
+}
+
+optional<vector<uint8_t>> Storage::Priv::loadBytes(const Digest & digest) const
+{
+ vector<uint8_t> in(Priv::CHUNK);
+ vector<uint8_t> out;
+ size_t decoded = 0;
+
+ z_stream strm;
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ strm.avail_in = 0;
+ strm.next_in = Z_NULL;
+ int ret = inflateInit(&strm);
+ if (ret != Z_OK)
+ throw runtime_error("zlib initialization failed");
+
+ ifstream fin(objectPath(digest), std::ios::binary);
+ if (!fin.is_open())
+ return nullopt;
+
+ while (!fin.eof() && ret != Z_STREAM_END) {
+ fin.read((char*) in.data(), in.size());
+ if (fin.bad()) {
+ inflateEnd(&strm);
+ throw runtime_error("failed to read stored file");
+ }
+ strm.avail_in = fin.gcount();
+ if (strm.avail_in == 0)
+ break;
+ strm.next_in = in.data();
+
+ do {
+ if (out.size() < decoded + in.size())
+ out.resize(decoded + in.size());
+
+ strm.avail_out = out.size() - decoded;
+ strm.next_out = out.data() + decoded;
+ ret = inflate(&strm, Z_NO_FLUSH);
+ switch (ret) {
+ case Z_STREAM_ERROR:
+ case Z_NEED_DICT:
+ case Z_DATA_ERROR:
+ case Z_MEM_ERROR:
+ inflateEnd(&strm);
+ throw runtime_error("zlib decoding failed");
+ }
+ decoded = out.size() - strm.avail_out;
+ } while (strm.avail_out == 0);
+ }
+
+
+ inflateEnd(&strm);
+ if (ret != Z_STREAM_END)
+ throw runtime_error("zlib decoding failed");
+
+ out.resize(decoded);
+ return out;
+}
+
+optional<Object> Storage::load(const Digest & digest) const
+{
+ auto ocontent = p->loadBytes(digest);
+ if (!ocontent.has_value())
+ return nullopt;
+ auto content = ocontent.value();
+
+ array<uint8_t, Digest::size> arr;
+ int ret = blake2b(arr.data(), content.data(), nullptr,
+ Digest::size, content.size(), 0);
+ if (ret != 0 || digest != Digest(arr))
+ throw runtime_error("digest verification failed");
+
+ return Object::decode(*this, content);
+}
+
+void Storage::Priv::storeBytes(const Digest & digest, const vector<uint8_t> & in) const
+{
+ vector<uint8_t> out(Priv::CHUNK);
+
+ z_stream strm;
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ int ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
+ if (ret != Z_OK)
+ throw runtime_error("zlib initialization failed");
+
+ auto path = objectPath(digest);
+ auto lock = path;
+ lock += ".lock";
+
+ fs::create_directories(path.parent_path());
+
+ // No way to use open exclusively in c++ stdlib
+ FILE *f = nullptr;
+ for (int i = 0; i < 10; i++) {
+ f = fopen(lock.c_str(), "wbxe");
+ if (f || errno != EEXIST)
+ break;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ if (fs::exists(path)) {
+ if (f) {
+ fclose(f);
+ fs::remove(lock);
+ }
+ return;
+ }
+ if (!f)
+ throw runtime_error("failed to open storage file");
+
+ strm.avail_in = in.size();
+ strm.next_in = const_cast<uint8_t*>(in.data());
+ do {
+ strm.avail_out = out.size();
+ strm.next_out = out.data();
+ ret = deflate(&strm, Z_FINISH);
+ if (ret == Z_STREAM_ERROR)
+ break;
+ size_t have = out.size() - strm.avail_out;
+ if (fwrite(out.data(), 1, have, f) != have || ferror(f)) {
+ ret = Z_ERRNO;
+ break;
+ }
+ } while (strm.avail_out == 0);
+
+ fclose(f);
+ deflateEnd(&strm);
+
+ if (strm.avail_in != 0 || ret != Z_STREAM_END) {
+ fs::remove(lock);
+ throw runtime_error("failed to deflate object");
+ }
+
+ fs::rename(lock, path);
+}
+
+Ref Storage::store(const Object & object) const
+{
+ // TODO: ensure storage transitively
+ auto content = object.encode();
+
+ array<uint8_t, Digest::size> arr;
+ int ret = blake2b(arr.data(), content.data(), nullptr,
+ Digest::size, content.size(), 0);
+ if (ret != 0)
+ throw runtime_error("failed to compute digest");
+
+ Digest digest(arr);
+ p->storeBytes(digest, content);
+ return Ref::create(*this, digest).value();
+}
+
+
+Digest::Digest(const string & str)
+{
+ if (str.size() != 2 * size)
+ throw runtime_error("invalid ref digest");
+
+ for (int i = 0; i < size; i++)
+ std::from_chars(str.data() + 2 * i,
+ str.data() + 2 * i + 2,
+ value[i], 16);
+}
+
+Digest::operator string() const
+{
+ string res(size * 2, '0');
+ for (int i = 0; i < size; i++)
+ std::to_chars(res.data() + 2 * i + (value[i] < 0x10),
+ res.data() + 2 * i + 2,
+ value[i], 16);
+ return res;
+}
+
+
+optional<Ref> Ref::create(Storage st, const Digest & digest)
+{
+ if (!fs::exists(st.p->objectPath(digest)))
+ return nullopt;
+
+ auto p = new Priv {
+ .storage = st,
+ .digest = digest,
+ };
+
+ p->object = std::async(std::launch::deferred, [p] {
+ auto obj = p->storage.load(p->digest);
+ if (!obj.has_value())
+ throw runtime_error("failed to decode bytes");
+
+ return obj.value();
+ });
+
+ return Ref(shared_ptr<Priv>(p));
+}
+
+const Digest & Ref::digest() const
+{
+ return p->digest;
+}
+
+const Object & Ref::operator*() const
+{
+ return p->object.get();
+}
+
+const Object * Ref::operator->() const
+{
+ return &p->object.get();
+}
+
+
+optional<int> Record::Item::asInteger() const
+{
+ if (holds_alternative<int>(value))
+ return std::get<int>(value);
+ return nullopt;
+}
+
+optional<string> Record::Item::asText() const
+{
+ if (holds_alternative<string>(value))
+ return std::get<string>(value);
+ return nullopt;
+}
+
+optional<vector<uint8_t>> Record::Item::asBinary() const
+{
+ if (holds_alternative<vector<uint8_t>>(value))
+ return std::get<vector<uint8_t>>(value);
+ return nullopt;
+}
+
+optional<Ref> Record::Item::asRef() const
+{
+ if (holds_alternative<Ref>(value))
+ return std::get<Ref>(value);
+ return nullopt;
+}
+
+
+Record::Record(const vector<Item> & from):
+ ptr(new vector<Item>(from))
+{}
+
+Record Record::decode(Storage st,
+ vector<uint8_t>::const_iterator begin,
+ vector<uint8_t>::const_iterator end)
+{
+ auto items = make_shared<vector<Item>>();
+
+ while (begin != end) {
+ const auto newline = std::find(begin, end, '\n');
+ if (newline == end)
+ throw runtime_error("invalid record");
+
+ const auto colon = std::find(begin, newline, ':');
+ if (colon == newline)
+ throw runtime_error("invalid record");
+
+ const auto space = std::find(colon, newline, ' ');
+ if (space == newline)
+ throw runtime_error("invalid record");
+
+ const auto name = string(begin, colon);
+ const auto type = string(colon + 1, space);
+ const auto value = string(space + 1, newline);
+
+ if (type == "i")
+ items->emplace_back(name, std::stoi(value));
+ else if (type == "t")
+ items->emplace_back(name, value);
+ else if (type == "b")
+ items->emplace_back(name, base64::decode(value));
+ else if (type == "r.b2")
+ items->emplace_back(name, Ref::create(st, Digest(value)).value());
+ else
+ throw runtime_error("unknown record item type");
+
+ begin = newline + 1;
+ }
+
+ return Record(items);
+}
+
+vector<uint8_t> Record::encode() const
+{
+ return Object(*this).encode();
+}
+
+const vector<Record::Item> & Record::items() const
+{
+ return *ptr;
+}
+
+optional<Record::Item> Record::item(const string & name) const
+{
+ for (auto item : *ptr) {
+ if (item.name == name)
+ return item;
+ }
+ return nullopt;
+}
+
+optional<Record::Item> Record::operator[](const string & name) const
+{
+ return item(name);
+}
+
+vector<Record::Item> Record::items(const string & name) const
+{
+ vector<Item> res;
+ for (auto item : *ptr) {
+ if (item.name == name)
+ res.push_back(item);
+ }
+ return res;
+}
+
+
+vector<uint8_t> Record::encodeInner() const
+{
+ vector<uint8_t> res;
+ auto inserter = std::back_inserter(res);
+ for (const auto & item : *ptr) {
+ copy(item.name.begin(), item.name.end(), inserter);
+ inserter = ':';
+
+ string type;
+ string value;
+
+ if (auto x = item.asInteger()) {
+ type = "i";
+ value = to_string(*x);
+ } else if (auto x = item.asText()) {
+ type = "t";
+ value = *x;
+ } else if (auto x = item.asBinary()) {
+ type = "b";
+ value = base64::encode(*x);
+ } else if (auto x = item.asRef()) {
+ type = "r.b2";
+ value = string(x->digest());
+ } else {
+ throw runtime_error("unhandeled record item type");
+ }
+
+ copy(type.begin(), type.end(), inserter);
+ inserter = ' ';
+ copy(value.begin(), value.end(), inserter);
+ inserter = '\n';
+ }
+ return res;
+}
+
+
+Blob::Blob(const vector<uint8_t> & vec):
+ ptr(make_shared<vector<uint8_t>>(vec))
+{}
+
+vector<uint8_t> Blob::encode() const
+{
+ return Object(*this).encode();
+}
+
+vector<uint8_t> Blob::encodeInner() const
+{
+ return *ptr;
+}
+
+Blob Blob::decode(Storage,
+ vector<uint8_t>::const_iterator begin,
+ vector<uint8_t>::const_iterator end)
+{
+ return Blob(make_shared<vector<uint8_t>>(begin, end));
+}
+
+
+optional<Object> Object::decode(Storage st, const vector<uint8_t> & data)
+{
+ auto newline = std::find(data.begin(), data.end(), '\n');
+ if (newline == data.end())
+ return nullopt;
+
+ auto space = std::find(data.begin(), newline, ' ');
+ if (space == newline)
+ return nullopt;
+
+ size_t size = std::stoi(string(space + 1, newline));
+ if (data.end() - newline - 1 != size)
+ return nullopt;
+
+ string type(data.begin(), space);
+ if (type == "rec")
+ return Object(Record::decode(st, newline + 1, data.end()));
+ else if (type == "blob")
+ return Object(Blob::decode(st, newline + 1, data.end()));
+ else
+ throw runtime_error("unknown object type '" + type + "'");
+
+ return nullopt;
+}
+
+vector<uint8_t> Object::encode() const
+{
+ vector<uint8_t> res, inner;
+ string type;
+
+ if (auto rec = asRecord()) {
+ type = "rec";
+ inner = rec->encodeInner();
+ } else if (auto blob = asBlob()) {
+ type = "blob";
+ inner = blob->encodeInner();
+ } else {
+ throw runtime_error("unhandeled object type");
+ }
+
+ auto inserter = std::back_inserter(res);
+ copy(type.begin(), type.end(), inserter);
+ inserter = ' ';
+
+ auto slen = to_string(inner.size());
+ copy(slen.begin(), slen.end(), inserter);
+ inserter = '\n';
+
+ copy(inner.begin(), inner.end(), inserter);
+ return res;
+}
+
+optional<Record> Object::asRecord() const
+{
+ if (holds_alternative<Record>(content))
+ return std::get<Record>(content);
+ return nullopt;
+}
+
+optional<Blob> Object::asBlob() const
+{
+ if (holds_alternative<Blob>(content))
+ return std::get<Blob>(content);
+ return nullopt;
+}