summaryrefslogtreecommitdiff
path: root/src/network/protocol.h
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-08-31 22:17:16 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-09-29 10:02:17 +0200
commit81895699131121a1dab67ce026dcf8490c4de9e0 (patch)
tree5f373addbf9355a73d22c0b0ee840afe100eff93 /src/network/protocol.h
parenta689af61eb91dcbc135890276a3c6281166d30f9 (diff)
Network streams, accept for data response
Changelog: Implemented streams in network protocol
Diffstat (limited to 'src/network/protocol.h')
-rw-r--r--src/network/protocol.h83
1 files changed, 79 insertions, 4 deletions
diff --git a/src/network/protocol.h b/src/network/protocol.h
index ba40744..2592c9f 100644
--- a/src/network/protocol.h
+++ b/src/network/protocol.h
@@ -35,8 +35,12 @@ public:
static constexpr char defaultVersion[] = "0.1";
class Connection;
+ class Stream;
+ class InStream;
+ class OutStream;
struct Header;
+ struct StreamData;
struct ReceivedAnnounce;
struct NewConnection;
@@ -106,21 +110,83 @@ public:
optional<Header> receive(const PartialStorage &);
bool send(const PartialStorage &, NetworkProtocol::Header,
const vector<Object> &, bool secure);
+ bool send( const StreamData & chunk );
void close();
+ shared_ptr< InStream > openInStream( uint8_t sid );
+ shared_ptr< OutStream > openOutStream( uint8_t sid );
+
// temporary:
ChannelState & channel();
void trySendOutQueue();
private:
- static optional<Header> parsePacket(vector<uint8_t> & buf,
- Channel * channel, const PartialStorage & st,
- optional<uint64_t> & secure);
+ static variant< monostate, Header, StreamData >
+ parsePacket(vector<uint8_t> & buf,
+ Channel * channel, const PartialStorage & st,
+ optional<uint64_t> & secure);
unique_ptr<ConnectionPriv> p;
};
+class NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ Stream(uint8_t id_): id( id_ ) {}
+
+ bool hasDataLocked() const;
+
+ size_t writeLocked( const uint8_t * buf, size_t size );
+ size_t readLocked( uint8_t * buf, size_t size );
+
+ uint8_t id;
+ bool closed { false };
+ vector< uint8_t > writeBuffer;
+ vector< uint8_t > readBuffer;
+ vector< uint8_t >::const_iterator readPtr;
+ mutable mutex streamMutex;
+};
+
+class NetworkProtocol::InStream : public NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ InStream(uint8_t id): Stream( id ) {}
+
+public:
+ bool isComplete() const;
+ vector< uint8_t > readAll();
+ size_t read( uint8_t * buf, size_t size );
+
+protected:
+ void writeChunk( StreamData chunk );
+ bool tryUseChunkLocked( const StreamData & chunk );
+
+private:
+ uint64_t nextSequence { 0 };
+ vector< StreamData > outOfOrderChunks;
+};
+
+class NetworkProtocol::OutStream : public NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ OutStream(uint8_t id): Stream( id ) {}
+
+private:
+ StreamData getNextChunkLocked( size_t size );
+
+ uint64_t nextSequence { 0 };
+};
+
struct NetworkProtocol::ReceivedAnnounce { sockaddr_in6 addr; Digest digest; };
struct NetworkProtocol::NewConnection { Connection conn; };
struct NetworkProtocol::ConnectionReadReady { Connection::Id id; };
@@ -141,6 +207,7 @@ struct NetworkProtocol::Header
struct ChannelAccept { Digest value; };
struct ServiceType { UUID value; };
struct ServiceRef { Digest value; };
+ struct StreamOpen { uint8_t value; };
using Item = variant<
Acknowledged,
@@ -156,7 +223,8 @@ struct NetworkProtocol::Header
ChannelRequest,
ChannelAccept,
ServiceType,
- ServiceRef>;
+ ServiceRef,
+ StreamOpen>;
Header(const vector<Item> & items): items(items) {}
static optional<Header> load(const PartialRef &);
@@ -169,6 +237,13 @@ struct NetworkProtocol::Header
vector<Item> items;
};
+struct NetworkProtocol::StreamData
+{
+ uint8_t id;
+ uint8_t sequence;
+ vector< uint8_t > data;
+};
+
template<class T>
const T * NetworkProtocol::Header::lookupFirst() const
{