#pragma once #include #include "network/protocol.h" #include #include #include #include #include #include using std::condition_variable; using std::monostate; using std::mutex; using std::optional; using std::shared_lock; using std::shared_mutex; using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::variant; using std::vector; using std::tuple; using std::weak_ptr; using std::enable_shared_from_this; namespace chrono = std::chrono; using chrono::steady_clock; namespace erebos { class ReplyBuilder; struct WaitingRef; struct Server::Peer { Peer(const Peer &) = delete; Peer & operator=(const Peer &) = delete; Priv & server; NetworkProtocol::Connection connection; variant, Identity> identity; vector> identityUpdates; Storage tempStorage; PartialStorage partStorage; vector>> serviceQueue {}; vector< shared_ptr< NetworkProtocol::InStream >> dataResponseStreams {}; vector< Digest > requestedData {}; shared_ptr lpeer = nullptr; void updateIdentity(ReplyBuilder &, vector> & notifyPeers); void updateChannel(ReplyBuilder &); void finalizeChannel(ReplyBuilder &, unique_ptr); void updateService(ReplyBuilder &, vector, Service &, Ref>> & readyServices); void checkDataResponseStreams( ReplyBuilder & ); }; struct Peer::Priv : enable_shared_from_this { weak_ptr speer; weak_ptr list; size_t listIndex; void notifyWatchers(); void runServicesHandler(Service & service, Ref ref); }; struct PeerList::Priv : enable_shared_from_this { mutex dataMutex; vector> peers; vector> watchers; void push(const shared_ptr &); }; struct Server::Priv { Priv(const Head & local, const Identity & self); ~Priv(); shared_ptr getptr(); void startThreads(); void doListen(); void doAnnounce(); bool isSelfAddress(const sockaddr_in6 & paddr); Peer * findPeer(NetworkProtocol::Connection::Id cid) const; Peer & getPeer(const sockaddr_in6 & paddr); Peer & addPeer(NetworkProtocol::Connection conn); void handlePacket(Peer &, const NetworkProtocol::Header &, ReplyBuilder &); void handleLocalHeadChange(const Head &); constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; mutable mutex dataMutex; condition_variable announceCondvar; bool finish = false; shared_mutex selfMutex; Identity self; const Bhv localState; thread threadListen; thread threadAnnounce; vector> peers; PeerList plist; vector outgoing; vector> waiting; NetworkProtocol protocol; vector localAddresses; vector bcastAddresses; // Stop watching before destroying other data WatchedHead localHead; // Start destruction with finalizing services vector> services; }; class ReplyBuilder { public: using Header = NetworkProtocol::Header; void header( Header::Item && ); void body( const Ref & ); void stream( shared_ptr< NetworkProtocol::OutStream >); const vector< Header::Item > & header() const { return mheader; } vector< Object > body() const; shared_ptr< NetworkProtocol::OutStream > stream() const { return mstream; } size_t size() const; private: vector< Header::Item > mheader; vector< Ref > mbody; size_t bodySize = 0; shared_ptr< NetworkProtocol::OutStream > mstream; }; struct WaitingRef { const Storage storage; const PartialRef ref; vector< Digest > missing; optional< Ref > check(); optional< Ref > check( ReplyBuilder &, const vector< Digest > &); }; }