summaryrefslogtreecommitdiff
path: root/src/network.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network.cpp')
-rw-r--r--src/network.cpp112
1 files changed, 108 insertions, 4 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 40d1045..b1beb5c 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -10,7 +10,6 @@
using std::scoped_lock;
using std::unique_lock;
-using std::unique_ptr;
using namespace erebos;
@@ -76,6 +75,29 @@ Server::Priv::~Priv()
void Server::Priv::doListen()
{
+ vector<uint8_t> buf(4096);
+ unique_lock<mutex> lock(dataMutex);
+
+ while (!finish) {
+ sockaddr_in paddr;
+
+ lock.unlock();
+ socklen_t addrlen = sizeof(paddr);
+ ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0,
+ (sockaddr *) &paddr, &addrlen);
+ if (ret < 0)
+ throw std::system_error(errno, std::generic_category());
+
+ if (auto dec = Object::decodePrefix(self.ref()->storage(),
+ buf.begin(), buf.begin() + ret)) {
+ if (auto header = TransportHeader::load(std::get<Object>(*dec))) {
+ scoped_lock<mutex> hlock(dataMutex);
+ handlePacket(getPeer(paddr), *header);
+ }
+ }
+
+ lock.lock();
+ }
}
void Server::Priv::doAnnounce()
@@ -108,9 +130,85 @@ void Server::Priv::doAnnounce()
}
}
+Peer & Server::Priv::getPeer(const sockaddr_in & paddr)
+{
+ for (auto & peer : peers)
+ if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0)
+ return *peer;
+
+ Peer * peer = new Peer { .sock = sock, .addr = paddr };
+ peers.emplace_back(peer);
+ return *peer;
+}
+
+void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header)
+{
+ unordered_set<Digest> plaintextRefs;
+ for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref())))
+ plaintextRefs.insert(obj.ref.digest());
+
+ for (auto & item : header.items) {
+ switch (item.type) {
+ case TransportHeader::Type::Acknowledged:
+ break;
+
+ case TransportHeader::Type::DataRequest: {
+ auto ref = std::get<Ref>(item.value);
+ if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) {
+ TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref };
+ peer.send(TransportHeader({ hitem }), { *ref });
+ }
+ break;
+ }
+
+ case TransportHeader::Type::DataResponse:
+ break;
+
+ case TransportHeader::Type::AnnounceSelf:
+ break;
+
+ case TransportHeader::Type::AnnounceUpdate:
+ break;
+
+ case TransportHeader::Type::ChannelRequest:
+ break;
+
+ case TransportHeader::Type::ChannelAccept:
+ break;
+
+ case TransportHeader::Type::ServiceType:
+ break;
+
+ case TransportHeader::Type::ServiceRef:
+ break;
+
+ }
+ }
+}
+
+void Peer::send(const TransportHeader & header, const vector<Object> & objs)
+{
+ vector<uint8_t> data, part;
+
+ part = header.toObject().encode();
+ data.insert(data.end(), part.begin(), part.end());
+ for (const auto & obj : objs) {
+ part = obj.encode();
+ data.insert(data.end(), part.begin(), part.end());
+ }
+
+ sendto(sock, data.data(), data.size(), 0,
+ (sockaddr *) &addr, sizeof(addr));
+}
+
optional<TransportHeader> TransportHeader::load(const Ref & ref)
{
- auto rec = ref->asRecord();
+ return load(*ref);
+}
+
+optional<TransportHeader> TransportHeader::load(const Object & obj)
+{
+ auto rec = obj.asRecord();
if (!rec)
return nullopt;
@@ -176,7 +274,7 @@ optional<TransportHeader> TransportHeader::load(const Ref & ref)
return TransportHeader { .items = items };
}
-Ref TransportHeader::store(const Storage & st) const
+Object TransportHeader::toObject() const
{
vector<Record::Item> ritems;
@@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const
}
}
- return st.storeObject(Record(std::move(ritems)));
+ return Object(Record(std::move(ritems)));
+}
+
+Ref TransportHeader::store(const Storage & st) const
+{
+
+ return st.storeObject(toObject());
}