diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/network.cpp | 197 | ||||
-rw-r--r-- | src/network.h | 5 |
2 files changed, 126 insertions, 76 deletions
diff --git a/src/network.cpp b/src/network.cpp index 7f37cf7..e63949f 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -158,9 +158,11 @@ string Peer::name() const optional<Identity> Peer::identity() const { - if (auto speer = p->speer.lock()) + if (auto speer = p->speer.lock()) { + scoped_lock lock(speer->server.dataMutex); if (holds_alternative<Identity>(speer->identity)) return std::get<Identity>(speer->identity); + } return nullopt; } @@ -192,10 +194,37 @@ uint16_t Peer::port() const void Peer::Priv::notifyWatchers() { - if (auto slist = list.lock()) { - Peer p(shared_from_this()); - for (const auto & w : slist->watchers) - w(listIndex, &p); + vector<function<void(size_t, const Peer *)>> callbacks; + + { + if (auto slist = list.lock()) { + scoped_lock lock(slist->dataMutex); + callbacks = slist->watchers; + } + } + + Peer p(shared_from_this()); + for (const auto & w : callbacks) + w(listIndex, &p); +} + +void Peer::Priv::runServicesHandler(Service & service, Ref ref) +{ + if (auto sptr = speer.lock()) { + Service::Context ctx { nullptr }; + + sptr->server.localHead.update([&] (const Stored<LocalState> & local) { + ctx = Service::Context(new Service::Context::Priv { + .ref = ref, + .peer = erebos::Peer(shared_from_this()), + .local = local, + }); + + service.handle(ctx); + return ctx.local(); + }); + + ctx.runAfterCommitHooks(); } } @@ -242,41 +271,53 @@ PeerList::~PeerList() = default; void PeerList::Priv::push(const shared_ptr<Server::Peer> & speer) { - scoped_lock lock(dataMutex); - size_t s = peers.size(); + vector<function<void(size_t, const Peer *)>> callbacks; + size_t s; - speer->lpeer.reset(new Peer::Priv); - speer->lpeer->speer = speer; - speer->lpeer->list = shared_from_this(); - speer->lpeer->listIndex = s; + { + scoped_lock lock(dataMutex); + s = peers.size(); - Peer p(speer->lpeer); + speer->lpeer.reset(new Peer::Priv); + speer->lpeer->speer = speer; + speer->lpeer->list = shared_from_this(); + speer->lpeer->listIndex = s; - peers.push_back(speer->lpeer); - for (const auto & w : watchers) + peers.push_back(speer->lpeer); + callbacks = watchers; + } + + Peer p(speer->lpeer); + for (const auto & w : callbacks) w(s, &p); } size_t PeerList::size() const { + scoped_lock lock(p->dataMutex); return p->peers.size(); } Peer PeerList::at(size_t i) const { + scoped_lock lock(p->dataMutex); return Peer(p->peers.at(i)); } void PeerList::onUpdate(function<void(size_t, const Peer *)> w) { - scoped_lock lock(p->dataMutex); - for (size_t i = 0; i < p->peers.size(); i++) { - if (auto speer = p->peers[i]->speer.lock()) { - Peer peer(speer->lpeer); - w(i, &peer); - } + vector<Peer> peers; + + { + scoped_lock lock(p->dataMutex); + for (size_t i = 0; i < p->peers.size(); i++) + if (auto speer = p->peers[i]->speer.lock()) + peers.emplace_back(speer->lpeer); + p->watchers.push_back(w); } - p->watchers.push_back(w); + + for (size_t i = 0; i < peers.size(); i++) + w(i, &peers[i]); } @@ -381,6 +422,9 @@ void Server::Priv::doListen() if (!peer) continue; + vector<shared_ptr<erebos::Peer::Priv>> notifyPeers; + vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> readyServices; + if (auto header = peer->connection.receive(peer->partStorage)) { ReplyBuilder reply; @@ -388,9 +432,9 @@ void Server::Priv::doListen() shared_lock slock(selfMutex); handlePacket(*peer, *header, reply); - peer->updateIdentity(reply); + peer->updateIdentity(reply, notifyPeers); peer->updateChannel(reply); - peer->updateService(reply); + peer->updateService(reply, readyServices); if (!reply.header().empty()) peer->connection.send(peer->partStorage, @@ -398,6 +442,11 @@ void Server::Priv::doListen() peer->connection.trySendOutQueue(); } + + for (const auto & p : notifyPeers) + p->notifyWatchers(); + for (const auto & [ lpeer, service, ref ] : readyServices) + lpeer->runServicesHandler(service, ref); } } @@ -452,41 +501,55 @@ Server::Peer * Server::Priv::findPeer(NetworkProtocol::Connection::Id cid) const Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr) { - scoped_lock lock(dataMutex); + shared_ptr<Peer> peer; + shared_ptr<PeerList::Priv> sptr; - for (auto & peer : peers) - if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0) - return *peer; - - auto st = self.ref()->storage().deriveEphemeralStorage(); - shared_ptr<Peer> peer(new Peer { - .server = *this, - .connection = protocol.connect(paddr), - .identity = monostate(), - .identityUpdates = {}, - .tempStorage = st, - .partStorage = st.derivePartialStorage(), - }); - peers.push_back(peer); - plist.p->push(peer); + { + scoped_lock lock(dataMutex); + + for (auto & peer : peers) + if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0) + return *peer; + + auto st = self.ref()->storage().deriveEphemeralStorage(); + peer.reset(new Peer { + .server = *this, + .connection = protocol.connect(paddr), + .identity = monostate(), + .identityUpdates = {}, + .tempStorage = st, + .partStorage = st.derivePartialStorage(), + }); + peers.push_back(peer); + sptr = plist.p; + } + + sptr->push(peer); return *peer; } Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn) { - scoped_lock lock(dataMutex); + shared_ptr<Peer> peer; + shared_ptr<PeerList::Priv> sptr; - auto st = self.ref()->storage().deriveEphemeralStorage(); - shared_ptr<Peer> peer(new Peer { - .server = *this, - .connection = move(conn), - .identity = monostate(), - .identityUpdates = {}, - .tempStorage = st, - .partStorage = st.derivePartialStorage(), - }); - peers.push_back(peer); - plist.p->push(peer); + { + scoped_lock lock(dataMutex); + + auto st = self.ref()->storage().deriveEphemeralStorage(); + peer.reset(new Peer { + .server = *this, + .connection = move(conn), + .identity = monostate(), + .identityUpdates = {}, + .tempStorage = st, + .partStorage = st.derivePartialStorage(), + }); + peers.push_back(peer); + sptr = plist.p; + } + + sptr->push(peer); return *peer; } @@ -655,14 +718,14 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head) } } -void Server::Peer::updateIdentity(ReplyBuilder &) +void Server::Peer::updateIdentity(ReplyBuilder &, vector<shared_ptr<erebos::Peer::Priv>> & notifyPeers) { if (holds_alternative<shared_ptr<WaitingRef>>(identity)) { if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check()) if (auto id = Identity::load(*ref)) { identity.emplace<Identity>(*id); if (lpeer) - lpeer->notifyWatchers(); + notifyPeers.push_back(lpeer); } } else if (holds_alternative<Identity>(identity)) { @@ -684,7 +747,7 @@ void Server::Peer::updateIdentity(ReplyBuilder &) if (nid != get<Identity>(identity)) { identity = move(nid); if (lpeer) - lpeer->notifyWatchers(); + notifyPeers.push_back(lpeer); } } } @@ -742,32 +805,18 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch) reply.header(NetworkProtocol::Header::AnnounceUpdate { r.digest() }); } -void Server::Peer::updateService(ReplyBuilder & reply) +void Server::Peer::updateService(ReplyBuilder & reply, vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> & readyServices) { decltype(serviceQueue) next; for (auto & x : serviceQueue) { if (auto ref = std::get<1>(x)->check(reply)) { if (lpeer) { - Service::Context ctx { nullptr }; - - server.localHead.update([&] (const Stored<LocalState> & local) { - ctx = Service::Context(new Service::Context::Priv { - .ref = *ref, - .peer = erebos::Peer(lpeer), - .local = local, - }); - - for (auto & svc : server.services) { - if (svc->uuid() == std::get<UUID>(x)) { - svc->handle(ctx); - break; - } + for (auto & svc : server.services) { + if (svc->uuid() == std::get<UUID>(x)) { + readyServices.emplace_back(lpeer, *svc, *ref); + break; } - - return ctx.local(); - }); - - ctx.runAfterCommitHooks(); + } } } else { next.push_back(std::move(x)); diff --git a/src/network.h b/src/network.h index d1fae15..12ec4e1 100644 --- a/src/network.h +++ b/src/network.h @@ -57,10 +57,10 @@ struct Server::Peer shared_ptr<erebos::Peer::Priv> lpeer = nullptr; - void updateIdentity(ReplyBuilder &); + void updateIdentity(ReplyBuilder &, vector<shared_ptr<erebos::Peer::Priv>> & notifyPeers); void updateChannel(ReplyBuilder &); void finalizeChannel(ReplyBuilder &, unique_ptr<Channel>); - void updateService(ReplyBuilder &); + void updateService(ReplyBuilder &, vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> & readyServices); }; struct Peer::Priv : enable_shared_from_this<Peer::Priv> @@ -70,6 +70,7 @@ struct Peer::Priv : enable_shared_from_this<Peer::Priv> size_t listIndex; void notifyWatchers(); + void runServicesHandler(Service & service, Ref ref); }; struct PeerList::Priv : enable_shared_from_this<PeerList::Priv> |