From b3cd90262f521d9c207b395f175cd3b26d2f4363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 31 Aug 2024 22:17:16 +0200 Subject: Network streams, accept for data response Changelog: Implemented streams in network protocol --- src/network/protocol.h | 83 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 4 deletions(-) (limited to 'src/network/protocol.h') diff --git a/src/network/protocol.h b/src/network/protocol.h index ba40744..2592c9f 100644 --- a/src/network/protocol.h +++ b/src/network/protocol.h @@ -35,8 +35,12 @@ public: static constexpr char defaultVersion[] = "0.1"; class Connection; + class Stream; + class InStream; + class OutStream; struct Header; + struct StreamData; struct ReceivedAnnounce; struct NewConnection; @@ -106,21 +110,83 @@ public: optional
receive(const PartialStorage &); bool send(const PartialStorage &, NetworkProtocol::Header, const vector &, bool secure); + bool send( const StreamData & chunk ); void close(); + shared_ptr< InStream > openInStream( uint8_t sid ); + shared_ptr< OutStream > openOutStream( uint8_t sid ); + // temporary: ChannelState & channel(); void trySendOutQueue(); private: - static optional
parsePacket(vector & buf, - Channel * channel, const PartialStorage & st, - optional & secure); + static variant< monostate, Header, StreamData > + parsePacket(vector & buf, + Channel * channel, const PartialStorage & st, + optional & secure); unique_ptr p; }; +class NetworkProtocol::Stream +{ + friend class NetworkProtocol; + friend class NetworkProtocol::Connection; + +protected: + Stream(uint8_t id_): id( id_ ) {} + + bool hasDataLocked() const; + + size_t writeLocked( const uint8_t * buf, size_t size ); + size_t readLocked( uint8_t * buf, size_t size ); + + uint8_t id; + bool closed { false }; + vector< uint8_t > writeBuffer; + vector< uint8_t > readBuffer; + vector< uint8_t >::const_iterator readPtr; + mutable mutex streamMutex; +}; + +class NetworkProtocol::InStream : public NetworkProtocol::Stream +{ + friend class NetworkProtocol; + friend class NetworkProtocol::Connection; + +protected: + InStream(uint8_t id): Stream( id ) {} + +public: + bool isComplete() const; + vector< uint8_t > readAll(); + size_t read( uint8_t * buf, size_t size ); + +protected: + void writeChunk( StreamData chunk ); + bool tryUseChunkLocked( const StreamData & chunk ); + +private: + uint64_t nextSequence { 0 }; + vector< StreamData > outOfOrderChunks; +}; + +class NetworkProtocol::OutStream : public NetworkProtocol::Stream +{ + friend class NetworkProtocol; + friend class NetworkProtocol::Connection; + +protected: + OutStream(uint8_t id): Stream( id ) {} + +private: + StreamData getNextChunkLocked( size_t size ); + + uint64_t nextSequence { 0 }; +}; + struct NetworkProtocol::ReceivedAnnounce { sockaddr_in6 addr; Digest digest; }; struct NetworkProtocol::NewConnection { Connection conn; }; struct NetworkProtocol::ConnectionReadReady { Connection::Id id; }; @@ -141,6 +207,7 @@ struct NetworkProtocol::Header struct ChannelAccept { Digest value; }; struct ServiceType { UUID value; }; struct ServiceRef { Digest value; }; + struct StreamOpen { uint8_t value; }; using Item = variant< Acknowledged, @@ -156,7 +223,8 @@ struct NetworkProtocol::Header ChannelRequest, ChannelAccept, ServiceType, - ServiceRef>; + ServiceRef, + StreamOpen>; Header(const vector & items): items(items) {} static optional
load(const PartialRef &); @@ -169,6 +237,13 @@ struct NetworkProtocol::Header vector items; }; +struct NetworkProtocol::StreamData +{ + uint8_t id; + uint8_t sequence; + vector< uint8_t > data; +}; + template const T * NetworkProtocol::Header::lookupFirst() const { -- cgit v1.2.3