#pragma once #include #include "channel.h" #include #include #include #include #include #include using std::condition_variable; using std::monostate; using std::mutex; using std::optional; using std::shared_lock; using std::shared_mutex; using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::variant; using std::vector; using std::tuple; using std::weak_ptr; using std::enable_shared_from_this; namespace chrono = std::chrono; using chrono::steady_clock; namespace erebos { class ReplyBuilder; struct WaitingRef; struct Server::Peer { Peer(const Peer &) = delete; Peer & operator=(const Peer &) = delete; Priv & server; const sockaddr_in addr; variant, Identity> identity; vector> identityUpdates; variant, shared_ptr, Stored, unique_ptr> channel; Storage tempStorage; PartialStorage partStorage; vector>> serviceQueue {}; vector> secureOutQueue {}; shared_ptr lpeer = nullptr; void send(const struct TransportHeader &, const vector &, bool secure); void updateIdentity(ReplyBuilder &); void updateChannel(ReplyBuilder &); void updateService(ReplyBuilder &); void trySendOutQueue(); }; struct Peer::Priv : enable_shared_from_this { weak_ptr speer; weak_ptr list; size_t listIndex; void notifyWatchers(); }; struct PeerList::Priv : enable_shared_from_this { mutex dataMutex; vector> peers; vector> watchers; void push(const shared_ptr &); }; struct TransportHeader { enum class Type { Acknowledged, DataRequest, DataResponse, AnnounceSelf, AnnounceUpdate, ChannelRequest, ChannelAccept, ServiceType, ServiceRef, }; struct Item { const Type type; const variant value; }; TransportHeader(const vector & items): items(items) {} static optional load(const PartialRef &); static optional load(const PartialObject &); PartialObject toObject() const; const vector items; }; class ReplyBuilder { public: void header(TransportHeader::Item &&); void body(const Ref &); const vector & header() const { return mheader; } vector body() const; private: vector mheader; vector mbody; }; struct WaitingRef { const Storage storage; const PartialRef ref; const Server::Peer & peer; vector missing; optional check(); optional check(ReplyBuilder &); }; struct Server::Priv { Priv(const Head & local, const Identity & self); ~Priv(); shared_ptr getptr(); void doListen(); void doAnnounce(); bool isSelfAddress(const sockaddr_in & paddr); Peer & getPeer(const sockaddr_in & paddr); void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &); void handleLocalHeadChange(const Head &); constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; mutex dataMutex; condition_variable announceCondvar; bool finish = false; shared_mutex selfMutex; Identity self; const Bhv localState; thread threadListen; thread threadAnnounce; vector> peers; PeerList plist; vector outgoing; vector> waiting; int sock; vector localAddresses; vector bcastAddresses; // Stop watching before destroying other data WatchedHead localHead; // Start destruction with finalizing services vector> services; }; }