#pragma once #include #include "channel.h" #include #include #include #include #include using std::condition_variable; using std::monostate; using std::mutex; using std::optional; using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::variant; using std::vector; using std::tuple; using std::weak_ptr; using std::enable_shared_from_this; namespace chrono = std::chrono; using chrono::steady_clock; namespace erebos { class ReplyBuilder; struct WaitingRef; struct Server::Peer { Peer(const Peer &) = delete; Peer & operator=(const Peer &) = delete; Priv & server; const sockaddr_in addr; variant, Identity> identity; variant, shared_ptr, Stored, Stored> channel; Storage tempStorage; PartialStorage partStorage; vector>> serviceQueue {}; shared_ptr lpeer = nullptr; void send(const struct TransportHeader &, const vector &) const; void updateIdentity(ReplyBuilder &); void updateChannel(ReplyBuilder &); void updateService(ReplyBuilder &); }; struct Peer::Priv : enable_shared_from_this { weak_ptr speer; weak_ptr list; size_t listIndex; void notifyWatchers(); }; struct PeerList::Priv : enable_shared_from_this { mutex dataMutex; vector> peers; vector> watchers; void push(const shared_ptr &); }; struct TransportHeader { enum class Type { Acknowledged, DataRequest, DataResponse, AnnounceSelf, AnnounceUpdate, ChannelRequest, ChannelAccept, ServiceType, ServiceRef, }; struct Item { const Type type; const variant value; }; TransportHeader(const vector & items): items(items) {} static optional load(const PartialRef &); static optional load(const PartialObject &); PartialObject toObject() const; const vector items; }; class ReplyBuilder { public: void header(TransportHeader::Item &&); void body(const Ref &); const vector & header() const { return mheader; } vector body() const; private: vector mheader; vector mbody; }; struct WaitingRef { const Storage storage; const PartialRef ref; const Server::Peer & peer; vector missing; optional check(ReplyBuilder &); }; struct Server::Priv : enable_shared_from_this { Priv(const Head & local, const Identity & self, vector> && svcs); ~Priv(); shared_ptr getptr(); void doListen(); void doAnnounce(); Peer & getPeer(const sockaddr_in & paddr); void handlePacket(Peer &, const TransportHeader &, ReplyBuilder &); void handleLocalHeadChange(const Head &); constexpr static uint16_t discoveryPort { 29665 }; constexpr static chrono::seconds announceInterval { 60 }; mutex dataMutex; condition_variable announceCondvar; bool finish = false; Identity self; Bhv localState; WatchedHead localHead; vector> services; thread threadListen; thread threadAnnounce; vector> peers; PeerList plist; vector outgoing; vector> waiting; int sock; vector bcastAddresses; }; }