summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp83
1 files changed, 79 insertions, 4 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 64f9ed3..bd1ea8e 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -2,12 +2,14 @@
#include "identity.h"
+#include <algorithm>
#include <cstring>
#include <ifaddrs.h>
#include <net/if.h>
#include <unistd.h>
+using std::holds_alternative;
using std::scoped_lock;
using std::unique_lock;
@@ -88,12 +90,19 @@ void Server::Priv::doListen()
if (ret < 0)
throw std::system_error(errno, std::generic_category());
- auto peer = getPeer(paddr);
+ auto & peer = getPeer(paddr);
if (auto dec = PartialObject::decodePrefix(peer.partStorage,
buf.begin(), buf.begin() + ret)) {
if (auto header = TransportHeader::load(std::get<PartialObject>(*dec))) {
+ auto pos = std::get<1>(*dec);
+ while (auto cdec = PartialObject::decodePrefix(peer.partStorage,
+ pos, buf.begin() + ret)) {
+ peer.partStorage.storeObject(std::get<PartialObject>(*cdec));
+ pos = std::get<1>(*cdec);
+ }
scoped_lock<mutex> hlock(dataMutex);
handlePacket(peer, *header);
+ peer.updateIdentity();
}
}
@@ -141,6 +150,7 @@ Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
Peer * peer = new Peer {
.sock = sock,
.addr = paddr,
+ .identity = monostate(),
.tempStorage = st,
.partStorage = st.derivePartialStorage(),
};
@@ -154,6 +164,9 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
plaintextRefs.insert(obj.ref.digest());
+ vector<TransportHeader::Item> replyHeaders;
+ vector<Object> replyBody;
+
for (auto & item : header.items) {
switch (item.type) {
case TransportHeader::Type::Acknowledged:
@@ -164,17 +177,49 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
if (plaintextRefs.find(pref.digest()) != plaintextRefs.end()) {
if (auto ref = peer.tempStorage.ref(pref.digest())) {
TransportHeader::Item hitem { TransportHeader::Type::DataResponse, *ref };
- peer.send(TransportHeader({ hitem }), { **ref });
+ replyHeaders.push_back({ TransportHeader::Type::DataResponse, *ref });
+ replyBody.push_back(**ref);
}
}
break;
}
case TransportHeader::Type::DataResponse:
+ if (auto pref = std::get<PartialRef>(item.value)) {
+ replyHeaders.push_back({ TransportHeader::Type::Acknowledged, pref });
+ for (auto & pwref : waiting) {
+ if (auto wref = pwref.lock()) {
+ if (std::find(wref->missing.begin(), wref->missing.end(), pref.digest()) !=
+ wref->missing.end()) {
+ if (wref->check(&replyHeaders))
+ pwref.reset();
+ }
+ }
+ }
+ waiting.erase(std::remove_if(waiting.begin(), waiting.end(),
+ [](auto & wref) { return wref.expired(); }), waiting.end());
+ }
break;
- case TransportHeader::Type::AnnounceSelf:
+ case TransportHeader::Type::AnnounceSelf: {
+ auto pref = std::get<PartialRef>(item.value);
+ if (pref.digest() == self.ref()->digest())
+ break;
+
+ if (holds_alternative<monostate>(peer.identity))
+ replyHeaders.push_back({ TransportHeader::Type::AnnounceSelf, *self.ref()});
+
+ shared_ptr<WaitingRef> wref(new WaitingRef {
+ .storage = peer.tempStorage,
+ .ref = pref,
+ .peer = peer,
+ .missing = {},
+ });
+ waiting.push_back(wref);
+ peer.identity = wref;
+ wref->check(&replyHeaders);
break;
+ }
case TransportHeader::Type::AnnounceUpdate:
break;
@@ -193,9 +238,12 @@ void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
}
}
+
+ if (!replyHeaders.empty())
+ peer.send(TransportHeader(replyHeaders), replyBody);
}
-void Peer::send(const TransportHeader & header, const vector<Object> & objs)
+void Peer::send(const TransportHeader & header, const vector<Object> & objs) const
{
vector<uint8_t> data, part;
@@ -210,6 +258,33 @@ void Peer::send(const TransportHeader & header, const vector<Object> & objs)
(sockaddr *) &addr, sizeof(addr));
}
+void Peer::updateIdentity()
+{
+ if (holds_alternative<shared_ptr<WaitingRef>>(identity))
+ if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check())
+ if (auto id = Identity::load(*ref))
+ identity.emplace<Identity>(*id);
+}
+
+
+optional<Ref> WaitingRef::check(vector<TransportHeader::Item> * request)
+{
+ if (auto r = storage.ref(ref.digest()))
+ return *r;
+
+ auto res = storage.copy(ref);
+ if (auto r = std::get_if<Ref>(&res))
+ return *r;
+
+ missing = std::get<vector<Digest>>(res);
+ if (request)
+ for (const auto & d : missing)
+ request->push_back({ TransportHeader::Type::DataRequest, peer.partStorage.ref(d) });
+
+ return nullopt;
+}
+
+
optional<TransportHeader> TransportHeader::load(const PartialRef & ref)
{
return load(*ref);