From b97b503408911130d24d7f07f9247dca8314a316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Wed, 1 Jan 2020 21:27:11 +0100 Subject: Respond to data requests from network --- src/network.cpp | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 4 deletions(-) (limited to 'src/network.cpp') diff --git a/src/network.cpp b/src/network.cpp index 40d1045..b1beb5c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,7 +10,6 @@ using std::scoped_lock; using std::unique_lock; -using std::unique_ptr; using namespace erebos; @@ -76,6 +75,29 @@ Server::Priv::~Priv() void Server::Priv::doListen() { + vector buf(4096); + unique_lock lock(dataMutex); + + while (!finish) { + sockaddr_in paddr; + + lock.unlock(); + socklen_t addrlen = sizeof(paddr); + ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0, + (sockaddr *) &paddr, &addrlen); + if (ret < 0) + throw std::system_error(errno, std::generic_category()); + + if (auto dec = Object::decodePrefix(self.ref()->storage(), + buf.begin(), buf.begin() + ret)) { + if (auto header = TransportHeader::load(std::get(*dec))) { + scoped_lock hlock(dataMutex); + handlePacket(getPeer(paddr), *header); + } + } + + lock.lock(); + } } void Server::Priv::doAnnounce() @@ -108,9 +130,85 @@ void Server::Priv::doAnnounce() } } +Peer & Server::Priv::getPeer(const sockaddr_in & paddr) +{ + for (auto & peer : peers) + if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0) + return *peer; + + Peer * peer = new Peer { .sock = sock, .addr = paddr }; + peers.emplace_back(peer); + return *peer; +} + +void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) +{ + unordered_set plaintextRefs; + for (const auto & obj : collectStoredObjects(*Stored::load(*self.ref()))) + plaintextRefs.insert(obj.ref.digest()); + + for (auto & item : header.items) { + switch (item.type) { + case TransportHeader::Type::Acknowledged: + break; + + case TransportHeader::Type::DataRequest: { + auto ref = std::get(item.value); + if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) { + TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref }; + peer.send(TransportHeader({ hitem }), { *ref }); + } + break; + } + + case TransportHeader::Type::DataResponse: + break; + + case TransportHeader::Type::AnnounceSelf: + break; + + case TransportHeader::Type::AnnounceUpdate: + break; + + case TransportHeader::Type::ChannelRequest: + break; + + case TransportHeader::Type::ChannelAccept: + break; + + case TransportHeader::Type::ServiceType: + break; + + case TransportHeader::Type::ServiceRef: + break; + + } + } +} + +void Peer::send(const TransportHeader & header, const vector & objs) +{ + vector data, part; + + part = header.toObject().encode(); + data.insert(data.end(), part.begin(), part.end()); + for (const auto & obj : objs) { + part = obj.encode(); + data.insert(data.end(), part.begin(), part.end()); + } + + sendto(sock, data.data(), data.size(), 0, + (sockaddr *) &addr, sizeof(addr)); +} + optional TransportHeader::load(const Ref & ref) { - auto rec = ref->asRecord(); + return load(*ref); +} + +optional TransportHeader::load(const Object & obj) +{ + auto rec = obj.asRecord(); if (!rec) return nullopt; @@ -176,7 +274,7 @@ optional TransportHeader::load(const Ref & ref) return TransportHeader { .items = items }; } -Ref TransportHeader::store(const Storage & st) const +Object TransportHeader::toObject() const { vector ritems; @@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const } } - return st.storeObject(Record(std::move(ritems))); + return Object(Record(std::move(ritems))); +} + +Ref TransportHeader::store(const Storage & st) const +{ + + return st.storeObject(toObject()); } -- cgit v1.2.3