summaryrefslogtreecommitdiff
path: root/src/network/protocol.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/protocol.h')
-rw-r--r--src/network/protocol.h107
1 files changed, 79 insertions, 28 deletions
diff --git a/src/network/protocol.h b/src/network/protocol.h
index ba40744..2db4e63 100644
--- a/src/network/protocol.h
+++ b/src/network/protocol.h
@@ -35,8 +35,12 @@ public:
static constexpr char defaultVersion[] = "0.1";
class Connection;
+ class Stream;
+ class InStream;
+ class OutStream;
struct Header;
+ struct StreamData;
struct ReceivedAnnounce;
struct NewConnection;
@@ -106,21 +110,83 @@ public:
optional<Header> receive(const PartialStorage &);
bool send(const PartialStorage &, NetworkProtocol::Header,
const vector<Object> &, bool secure);
+ bool send( const StreamData & chunk );
void close();
+ shared_ptr< InStream > openInStream( uint8_t sid );
+ shared_ptr< OutStream > openOutStream( uint8_t sid );
+
// temporary:
ChannelState & channel();
void trySendOutQueue();
private:
- static optional<Header> parsePacket(vector<uint8_t> & buf,
- Channel * channel, const PartialStorage & st,
- optional<uint64_t> & secure);
+ static variant< monostate, Header, StreamData >
+ parsePacket(vector<uint8_t> & buf,
+ Channel * channel, const PartialStorage & st,
+ optional<uint64_t> & secure);
unique_ptr<ConnectionPriv> p;
};
+class NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ Stream(uint8_t id_): id( id_ ) {}
+
+ bool hasDataLocked() const;
+
+ size_t writeLocked( const uint8_t * buf, size_t size );
+ size_t readLocked( uint8_t * buf, size_t size );
+
+ uint8_t id;
+ bool closed { false };
+ vector< uint8_t > writeBuffer;
+ vector< uint8_t > readBuffer;
+ vector< uint8_t >::const_iterator readPtr;
+ mutable mutex streamMutex;
+};
+
+class NetworkProtocol::InStream : public NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ InStream(uint8_t id): Stream( id ) {}
+
+public:
+ bool isComplete() const;
+ vector< uint8_t > readAll();
+ size_t read( uint8_t * buf, size_t size );
+
+protected:
+ void writeChunk( StreamData chunk );
+ bool tryUseChunkLocked( const StreamData & chunk );
+
+private:
+ uint64_t nextSequence { 0 };
+ vector< StreamData > outOfOrderChunks;
+};
+
+class NetworkProtocol::OutStream : public NetworkProtocol::Stream
+{
+ friend class NetworkProtocol;
+ friend class NetworkProtocol::Connection;
+
+protected:
+ OutStream(uint8_t id): Stream( id ) {}
+
+private:
+ StreamData getNextChunkLocked( size_t size );
+
+ uint64_t nextSequence { 0 };
+};
+
struct NetworkProtocol::ReceivedAnnounce { sockaddr_in6 addr; Digest digest; };
struct NetworkProtocol::NewConnection { Connection conn; };
struct NetworkProtocol::ConnectionReadReady { Connection::Id id; };
@@ -141,6 +207,7 @@ struct NetworkProtocol::Header
struct ChannelAccept { Digest value; };
struct ServiceType { UUID value; };
struct ServiceRef { Digest value; };
+ struct StreamOpen { uint8_t value; };
using Item = variant<
Acknowledged,
@@ -156,7 +223,8 @@ struct NetworkProtocol::Header
ChannelRequest,
ChannelAccept,
ServiceType,
- ServiceRef>;
+ ServiceRef,
+ StreamOpen>;
Header(const vector<Item> & items): items(items) {}
static optional<Header> load(const PartialRef &);
@@ -169,6 +237,13 @@ struct NetworkProtocol::Header
vector<Item> items;
};
+struct NetworkProtocol::StreamData
+{
+ uint8_t id;
+ uint8_t sequence;
+ vector< uint8_t > data;
+};
+
template<class T>
const T * NetworkProtocol::Header::lookupFirst() const
{
@@ -186,28 +261,4 @@ inline bool operator!=(const NetworkProtocol::Header::Item & left,
inline bool operator==(const NetworkProtocol::Cookie & left, const NetworkProtocol::Cookie & right)
{ return left.value == right.value; }
-class ReplyBuilder
-{
-public:
- void header(NetworkProtocol::Header::Item &&);
- void body(const Ref &);
-
- const vector<NetworkProtocol::Header::Item> & header() const { return mheader; }
- vector<Object> body() const;
-
-private:
- vector<NetworkProtocol::Header::Item> mheader;
- vector<Ref> mbody;
-};
-
-struct WaitingRef
-{
- const Storage storage;
- const PartialRef ref;
- vector<Digest> missing;
-
- optional<Ref> check();
- optional<Ref> check(ReplyBuilder &);
-};
-
}