diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2024-11-16 20:25:47 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2024-11-18 21:06:44 +0100 |
commit | 12497ed32f70a23552fd35161138b2e1812fc4f0 (patch) | |
tree | a3b2085ceacc56cfd7f553f42244a05bd9141e9d /src/network/protocol.cpp | |
parent | 47bd6afb103c5ddfb4a878e95416793610ed1be3 (diff) |
Network: use streams to send large objectsdevel
Diffstat (limited to 'src/network/protocol.cpp')
-rw-r--r-- | src/network/protocol.cpp | 76 |
1 files changed, 66 insertions, 10 deletions
diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index dbf1c40..89d6a88 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -16,14 +16,18 @@ using std::nullopt; using std::runtime_error; using std::scoped_lock; using std::to_string; +using std::unique_lock; using std::visit; namespace erebos { +static constexpr uint8_t maxStreamNumber = 0x3F; + struct NetworkProtocol::ConnectionPriv { Connection::Id id() const; + size_t mtu() const; bool send(const PartialStorage &, Header, const vector<Object> &, bool secure); bool send( const StreamData & chunk ); @@ -39,7 +43,7 @@ struct NetworkProtocol::ConnectionPriv ChannelState channel = monostate(); vector<vector<uint8_t>> secureOutQueue {}; - size_t mtu = 500; // TODO: MTU + size_t mtuLower = 1000; // TODO: MTU vector<uint64_t> toAcknowledge {}; @@ -94,11 +98,20 @@ NetworkProtocol::PollResult NetworkProtocol::poll() sendAck = not c->toAcknowledge.empty() && holds_alternative< unique_ptr< Channel >>( c->channel ); - for (const auto & s : c->outStreams) { - scoped_lock slock(s->streamMutex); + for (auto & s : c->outStreams) { + unique_lock slock(s->streamMutex); while (s->hasDataLocked()) - streamChunks.push_back( s->getNextChunkLocked( c->mtu ) ); + streamChunks.push_back( s->getNextChunkLocked( c->mtu() )); + if( s->closed ){ + // TODO: wait after ack + streamChunks.push_back( { s->id, (uint8_t) s->nextSequence, {} } ); + slock.unlock(); + s.reset(); + } } + + while( not c->outStreams.empty() && not c->outStreams.back() ) + c->outStreams.pop_back(); } if (sendAck) { auto pst = self->ref()->storage().deriveEphemeralStorage(); @@ -293,6 +306,8 @@ bool NetworkProtocol::verifyCookie(variant<sockaddr_in, sockaddr_in6> vaddr, con /* Connection */ /******************************************************************************/ +using Connection = NetworkProtocol::Connection; + NetworkProtocol::Connection::Id NetworkProtocol::ConnectionPriv::id() const { return reinterpret_cast<uintptr_t>(this); @@ -330,6 +345,24 @@ const sockaddr_in6 & NetworkProtocol::Connection::peerAddress() const return p->peerAddress; } +size_t Connection::mtu() const +{ + return p->mtu(); +} + +size_t NetworkProtocol::ConnectionPriv::mtu() const +{ + if( get_if< unique_ptr< Channel >>( &channel )) + return mtuLower // space for: + - 1 // "encrypted" tag + - 1 // counter + - 1 // channel number + - 1 // channel sequence + - 16 // tag + ; + return mtuLower - 128; // some space for cookie headers +} + optional<NetworkProtocol::Header> NetworkProtocol::Connection::receive(const PartialStorage & partStorage) { vector<uint8_t> buf; @@ -440,7 +473,7 @@ NetworkProtocol::Connection::parsePacket(vector<uint8_t> & buf, plainBegin = decrypted.begin() + 1; plainEnd = decrypted.end(); } - else if (decrypted[0] < 0x40) { + else if (decrypted[0] <= maxStreamNumber) { StreamData sdata; sdata.id = decrypted[0]; sdata.sequence = decrypted[1]; @@ -597,12 +630,17 @@ shared_ptr< NetworkProtocol::InStream > NetworkProtocol::Connection::openInStrea return p->inStreams.back(); } -shared_ptr< NetworkProtocol::OutStream > NetworkProtocol::Connection::openOutStream( uint8_t sid ) +shared_ptr< NetworkProtocol::OutStream > NetworkProtocol::Connection::openOutStream() { scoped_lock lock( p->cmutex ); - for (const auto & s : p->outStreams) - if (s->id == sid) - throw runtime_error("outbound stream " + to_string(sid) + " already open"); + + uint8_t sid = 1; + if( not p->outStreams.empty() ){ + if( p->outStreams.back()->id < maxStreamNumber ) + sid = p->outStreams.back()->id + 1; + else + throw runtime_error("no free outbound stream"); + } p->outStreams.emplace_back( new OutStream( sid )); return p->outStreams.back(); @@ -636,9 +674,21 @@ void NetworkProtocol::Connection::trySendOutQueue() } +NetworkProtocol::Stream::Stream(uint8_t id_): + id(id_) +{ + readPtr = readBuffer.begin(); +} + +void NetworkProtocol::Stream::close() +{ + scoped_lock lock( streamMutex ); + closed = true; +} + bool NetworkProtocol::Stream::hasDataLocked() const { - return not writeBuffer.empty() || not readBuffer.empty(); + return not writeBuffer.empty() || readPtr < readBuffer.end(); } size_t NetworkProtocol::Stream::writeLocked( const uint8_t * buf, size_t size ) @@ -725,6 +775,12 @@ bool NetworkProtocol::InStream::tryUseChunkLocked( const StreamData & chunk ) return true; } +size_t NetworkProtocol::OutStream::write( const uint8_t * buf, size_t size ) +{ + scoped_lock lock( streamMutex ); + return writeLocked( buf, size ); +} + NetworkProtocol::StreamData NetworkProtocol::OutStream::getNextChunkLocked( size_t size ) { StreamData res; |