diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2020-01-01 21:27:11 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2020-01-04 21:29:00 +0100 |
commit | b97b503408911130d24d7f07f9247dca8314a316 (patch) | |
tree | 3b4202b42bbe51b2cd5b3720e755a804d7872159 /src/network.cpp | |
parent | ebdbf9a1cd5308bf1c64d8dc912e0ea0e9ac8633 (diff) |
Respond to data requests from network
Diffstat (limited to 'src/network.cpp')
-rw-r--r-- | src/network.cpp | 112 |
1 files changed, 108 insertions, 4 deletions
diff --git a/src/network.cpp b/src/network.cpp index 40d1045..b1beb5c 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,7 +10,6 @@ using std::scoped_lock; using std::unique_lock; -using std::unique_ptr; using namespace erebos; @@ -76,6 +75,29 @@ Server::Priv::~Priv() void Server::Priv::doListen() { + vector<uint8_t> buf(4096); + unique_lock<mutex> lock(dataMutex); + + while (!finish) { + sockaddr_in paddr; + + lock.unlock(); + socklen_t addrlen = sizeof(paddr); + ssize_t ret = recvfrom(sock, buf.data(), buf.size(), 0, + (sockaddr *) &paddr, &addrlen); + if (ret < 0) + throw std::system_error(errno, std::generic_category()); + + if (auto dec = Object::decodePrefix(self.ref()->storage(), + buf.begin(), buf.begin() + ret)) { + if (auto header = TransportHeader::load(std::get<Object>(*dec))) { + scoped_lock<mutex> hlock(dataMutex); + handlePacket(getPeer(paddr), *header); + } + } + + lock.lock(); + } } void Server::Priv::doAnnounce() @@ -108,9 +130,85 @@ void Server::Priv::doAnnounce() } } +Peer & Server::Priv::getPeer(const sockaddr_in & paddr) +{ + for (auto & peer : peers) + if (memcmp(&peer->addr, &paddr, sizeof paddr) == 0) + return *peer; + + Peer * peer = new Peer { .sock = sock, .addr = paddr }; + peers.emplace_back(peer); + return *peer; +} + +void Server::Priv::handlePacket(Peer & peer, const TransportHeader & header) +{ + unordered_set<Digest> plaintextRefs; + for (const auto & obj : collectStoredObjects(*Stored<Object>::load(*self.ref()))) + plaintextRefs.insert(obj.ref.digest()); + + for (auto & item : header.items) { + switch (item.type) { + case TransportHeader::Type::Acknowledged: + break; + + case TransportHeader::Type::DataRequest: { + auto ref = std::get<Ref>(item.value); + if (plaintextRefs.find(ref.digest()) != plaintextRefs.end()) { + TransportHeader::Item hitem { TransportHeader::Type::DataResponse, ref }; + peer.send(TransportHeader({ hitem }), { *ref }); + } + break; + } + + case TransportHeader::Type::DataResponse: + break; + + case TransportHeader::Type::AnnounceSelf: + break; + + case TransportHeader::Type::AnnounceUpdate: + break; + + case TransportHeader::Type::ChannelRequest: + break; + + case TransportHeader::Type::ChannelAccept: + break; + + case TransportHeader::Type::ServiceType: + break; + + case TransportHeader::Type::ServiceRef: + break; + + } + } +} + +void Peer::send(const TransportHeader & header, const vector<Object> & objs) +{ + vector<uint8_t> data, part; + + part = header.toObject().encode(); + data.insert(data.end(), part.begin(), part.end()); + for (const auto & obj : objs) { + part = obj.encode(); + data.insert(data.end(), part.begin(), part.end()); + } + + sendto(sock, data.data(), data.size(), 0, + (sockaddr *) &addr, sizeof(addr)); +} + optional<TransportHeader> TransportHeader::load(const Ref & ref) { - auto rec = ref->asRecord(); + return load(*ref); +} + +optional<TransportHeader> TransportHeader::load(const Object & obj) +{ + auto rec = obj.asRecord(); if (!rec) return nullopt; @@ -176,7 +274,7 @@ optional<TransportHeader> TransportHeader::load(const Ref & ref) return TransportHeader { .items = items }; } -Ref TransportHeader::store(const Storage & st) const +Object TransportHeader::toObject() const { vector<Record::Item> ritems; @@ -220,5 +318,11 @@ Ref TransportHeader::store(const Storage & st) const } } - return st.storeObject(Record(std::move(ritems))); + return Object(Record(std::move(ritems))); +} + +Ref TransportHeader::store(const Storage & st) const +{ + + return st.storeObject(toObject()); } |