diff options
Diffstat (limited to 'src/network.h')
-rw-r--r-- | src/network.h | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/src/network.h b/src/network.h new file mode 100644 index 0000000..ed02167 --- /dev/null +++ b/src/network.h @@ -0,0 +1,171 @@ +#pragma once + +#include <erebos/network.h> + +#include "network/protocol.h" + +#include <condition_variable> +#include <mutex> +#include <shared_mutex> +#include <thread> +#include <vector> + +#include <netinet/in.h> + +using std::condition_variable; +using std::monostate; +using std::mutex; +using std::optional; +using std::shared_lock; +using std::shared_mutex; +using std::shared_ptr; +using std::string; +using std::thread; +using std::unique_ptr; +using std::variant; +using std::vector; +using std::tuple; +using std::weak_ptr; + +using std::enable_shared_from_this; + +namespace chrono = std::chrono; +using chrono::steady_clock; + +namespace erebos { + +class ReplyBuilder; +struct WaitingRef; + +struct Server::Peer +{ + Peer(const Peer &) = delete; + Peer & operator=(const Peer &) = delete; + + Priv & server; + NetworkProtocol::Connection connection; + + variant<monostate, + shared_ptr<struct WaitingRef>, + Identity> identity; + vector<shared_ptr<WaitingRef>> identityUpdates; + + Storage tempStorage; + PartialStorage partStorage; + + vector<tuple<UUID, shared_ptr<WaitingRef>>> serviceQueue {}; + vector< shared_ptr< NetworkProtocol::InStream >> dataResponseStreams {}; + vector< Digest > requestedData {}; + + shared_ptr<erebos::Peer::Priv> lpeer = nullptr; + + void updateIdentity(ReplyBuilder &, vector<shared_ptr<erebos::Peer::Priv>> & notifyPeers); + void updateChannel(ReplyBuilder &); + void finalizeChannel(ReplyBuilder &, unique_ptr<Channel>); + void updateService(ReplyBuilder &, vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> & readyServices); + void checkDataResponseStreams( ReplyBuilder & ); +}; + +struct Peer::Priv : enable_shared_from_this<Peer::Priv> +{ + weak_ptr<Server::Peer> speer; + weak_ptr<PeerList::Priv> list; + size_t listIndex; + + void notifyWatchers(); + void runServicesHandler(Service & service, Ref ref); +}; + +struct PeerList::Priv : enable_shared_from_this<PeerList::Priv> +{ + mutex dataMutex; + vector<shared_ptr<Peer::Priv>> peers; + vector<function<void(size_t, const Peer *)>> watchers; + + void push(const shared_ptr<Server::Peer> &); +}; + +struct Server::Priv +{ + Priv(const Head<LocalState> & local, const Identity & self); + ~Priv(); + + shared_ptr<Priv> getptr(); + + void startThreads(); + + void doListen(); + void doAnnounce(); + + bool isSelfAddress(const sockaddr_in6 & paddr); + Peer * findPeer(NetworkProtocol::Connection::Id cid) const; + Peer & getPeer(const sockaddr_in6 & paddr); + Peer & addPeer(NetworkProtocol::Connection conn); + void handlePacket(Peer &, const NetworkProtocol::Header &, ReplyBuilder &); + + void handleLocalHeadChange(const Head<LocalState> &); + + constexpr static uint16_t discoveryPort { 29665 }; + constexpr static chrono::seconds announceInterval { 60 }; + + mutable mutex dataMutex; + condition_variable announceCondvar; + bool finish = false; + + shared_mutex selfMutex; + Identity self; + const Bhv<LocalState> localState; + + thread threadListen; + thread threadAnnounce; + + vector<shared_ptr<Peer>> peers; + PeerList plist; + + vector<struct NetworkProtocol::Header> outgoing; + vector<weak_ptr<WaitingRef>> waiting; + + NetworkProtocol protocol; + vector<in_addr> localAddresses; + vector<in_addr> bcastAddresses; + + // Stop watching before destroying other data + WatchedHead<LocalState> localHead; + + // Start destruction with finalizing services + vector<unique_ptr<Service>> services; +}; + +class ReplyBuilder +{ +public: + using Header = NetworkProtocol::Header; + + void header( Header::Item && ); + void body( const Ref & ); + void stream( shared_ptr< NetworkProtocol::OutStream >); + + const vector< Header::Item > & header() const { return mheader; } + vector< Object > body() const; + shared_ptr< NetworkProtocol::OutStream > stream() const { return mstream; } + + size_t size() const; + +private: + vector< Header::Item > mheader; + vector< Ref > mbody; + size_t bodySize = 0; + shared_ptr< NetworkProtocol::OutStream > mstream; +}; + +struct WaitingRef +{ + const Storage storage; + const PartialRef ref; + vector< Digest > missing; + + optional< Ref > check(); + optional< Ref > check( ReplyBuilder &, const vector< Digest > &); +}; + +} |