summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-08-13 19:42:02 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-08-13 21:29:05 +0200
commite7687e020f6d31a22aadb08aa73ae000796ae139 (patch)
treec7072d2d9949245cdbc48c7cd3bb934b169a4f11
parenta4d433bf79ad4f4cb99dc9d5b632ea9cff054a91 (diff)
Server: avoid executing callbacks while holding locks
-rw-r--r--src/network.cpp197
-rw-r--r--src/network.h5
2 files changed, 126 insertions, 76 deletions
diff --git a/src/network.cpp b/src/network.cpp
index 7f37cf7..e63949f 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -158,9 +158,11 @@ string Peer::name() const
optional<Identity> Peer::identity() const
{
- if (auto speer = p->speer.lock())
+ if (auto speer = p->speer.lock()) {
+ scoped_lock lock(speer->server.dataMutex);
if (holds_alternative<Identity>(speer->identity))
return std::get<Identity>(speer->identity);
+ }
return nullopt;
}
@@ -192,10 +194,37 @@ uint16_t Peer::port() const
void Peer::Priv::notifyWatchers()
{
- if (auto slist = list.lock()) {
- Peer p(shared_from_this());
- for (const auto & w : slist->watchers)
- w(listIndex, &p);
+ vector<function<void(size_t, const Peer *)>> callbacks;
+
+ {
+ if (auto slist = list.lock()) {
+ scoped_lock lock(slist->dataMutex);
+ callbacks = slist->watchers;
+ }
+ }
+
+ Peer p(shared_from_this());
+ for (const auto & w : callbacks)
+ w(listIndex, &p);
+}
+
+void Peer::Priv::runServicesHandler(Service & service, Ref ref)
+{
+ if (auto sptr = speer.lock()) {
+ Service::Context ctx { nullptr };
+
+ sptr->server.localHead.update([&] (const Stored<LocalState> & local) {
+ ctx = Service::Context(new Service::Context::Priv {
+ .ref = ref,
+ .peer = erebos::Peer(shared_from_this()),
+ .local = local,
+ });
+
+ service.handle(ctx);
+ return ctx.local();
+ });
+
+ ctx.runAfterCommitHooks();
}
}
@@ -242,41 +271,53 @@ PeerList::~PeerList() = default;
void PeerList::Priv::push(const shared_ptr<Server::Peer> & speer)
{
- scoped_lock lock(dataMutex);
- size_t s = peers.size();
+ vector<function<void(size_t, const Peer *)>> callbacks;
+ size_t s;
- speer->lpeer.reset(new Peer::Priv);
- speer->lpeer->speer = speer;
- speer->lpeer->list = shared_from_this();
- speer->lpeer->listIndex = s;
+ {
+ scoped_lock lock(dataMutex);
+ s = peers.size();
- Peer p(speer->lpeer);
+ speer->lpeer.reset(new Peer::Priv);
+ speer->lpeer->speer = speer;
+ speer->lpeer->list = shared_from_this();
+ speer->lpeer->listIndex = s;
- peers.push_back(speer->lpeer);
- for (const auto & w : watchers)
+ peers.push_back(speer->lpeer);
+ callbacks = watchers;
+ }
+
+ Peer p(speer->lpeer);
+ for (const auto & w : callbacks)
w(s, &p);
}
size_t PeerList::size() const
{
+ scoped_lock lock(p->dataMutex);
return p->peers.size();
}
Peer PeerList::at(size_t i) const
{
+ scoped_lock lock(p->dataMutex);
return Peer(p->peers.at(i));
}
void PeerList::onUpdate(function<void(size_t, const Peer *)> w)
{
- scoped_lock lock(p->dataMutex);
- for (size_t i = 0; i < p->peers.size(); i++) {
- if (auto speer = p->peers[i]->speer.lock()) {
- Peer peer(speer->lpeer);
- w(i, &peer);
- }
+ vector<Peer> peers;
+
+ {
+ scoped_lock lock(p->dataMutex);
+ for (size_t i = 0; i < p->peers.size(); i++)
+ if (auto speer = p->peers[i]->speer.lock())
+ peers.emplace_back(speer->lpeer);
+ p->watchers.push_back(w);
}
- p->watchers.push_back(w);
+
+ for (size_t i = 0; i < peers.size(); i++)
+ w(i, &peers[i]);
}
@@ -381,6 +422,9 @@ void Server::Priv::doListen()
if (!peer)
continue;
+ vector<shared_ptr<erebos::Peer::Priv>> notifyPeers;
+ vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> readyServices;
+
if (auto header = peer->connection.receive(peer->partStorage)) {
ReplyBuilder reply;
@@ -388,9 +432,9 @@ void Server::Priv::doListen()
shared_lock slock(selfMutex);
handlePacket(*peer, *header, reply);
- peer->updateIdentity(reply);
+ peer->updateIdentity(reply, notifyPeers);
peer->updateChannel(reply);
- peer->updateService(reply);
+ peer->updateService(reply, readyServices);
if (!reply.header().empty())
peer->connection.send(peer->partStorage,
@@ -398,6 +442,11 @@ void Server::Priv::doListen()
peer->connection.trySendOutQueue();
}
+
+ for (const auto & p : notifyPeers)
+ p->notifyWatchers();
+ for (const auto & [ lpeer, service, ref ] : readyServices)
+ lpeer->runServicesHandler(service, ref);
}
}
@@ -452,41 +501,55 @@ Server::Peer * Server::Priv::findPeer(NetworkProtocol::Connection::Id cid) const
Server::Peer & Server::Priv::getPeer(const sockaddr_in6 & paddr)
{
- scoped_lock lock(dataMutex);
+ shared_ptr<Peer> peer;
+ shared_ptr<PeerList::Priv> sptr;
- for (auto & peer : peers)
- if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0)
- return *peer;
-
- auto st = self.ref()->storage().deriveEphemeralStorage();
- shared_ptr<Peer> peer(new Peer {
- .server = *this,
- .connection = protocol.connect(paddr),
- .identity = monostate(),
- .identityUpdates = {},
- .tempStorage = st,
- .partStorage = st.derivePartialStorage(),
- });
- peers.push_back(peer);
- plist.p->push(peer);
+ {
+ scoped_lock lock(dataMutex);
+
+ for (auto & peer : peers)
+ if (memcmp(&peer->connection.peerAddress(), &paddr, sizeof paddr) == 0)
+ return *peer;
+
+ auto st = self.ref()->storage().deriveEphemeralStorage();
+ peer.reset(new Peer {
+ .server = *this,
+ .connection = protocol.connect(paddr),
+ .identity = monostate(),
+ .identityUpdates = {},
+ .tempStorage = st,
+ .partStorage = st.derivePartialStorage(),
+ });
+ peers.push_back(peer);
+ sptr = plist.p;
+ }
+
+ sptr->push(peer);
return *peer;
}
Server::Peer & Server::Priv::addPeer(NetworkProtocol::Connection conn)
{
- scoped_lock lock(dataMutex);
+ shared_ptr<Peer> peer;
+ shared_ptr<PeerList::Priv> sptr;
- auto st = self.ref()->storage().deriveEphemeralStorage();
- shared_ptr<Peer> peer(new Peer {
- .server = *this,
- .connection = move(conn),
- .identity = monostate(),
- .identityUpdates = {},
- .tempStorage = st,
- .partStorage = st.derivePartialStorage(),
- });
- peers.push_back(peer);
- plist.p->push(peer);
+ {
+ scoped_lock lock(dataMutex);
+
+ auto st = self.ref()->storage().deriveEphemeralStorage();
+ peer.reset(new Peer {
+ .server = *this,
+ .connection = move(conn),
+ .identity = monostate(),
+ .identityUpdates = {},
+ .tempStorage = st,
+ .partStorage = st.derivePartialStorage(),
+ });
+ peers.push_back(peer);
+ sptr = plist.p;
+ }
+
+ sptr->push(peer);
return *peer;
}
@@ -655,14 +718,14 @@ void Server::Priv::handleLocalHeadChange(const Head<LocalState> & head)
}
}
-void Server::Peer::updateIdentity(ReplyBuilder &)
+void Server::Peer::updateIdentity(ReplyBuilder &, vector<shared_ptr<erebos::Peer::Priv>> & notifyPeers)
{
if (holds_alternative<shared_ptr<WaitingRef>>(identity)) {
if (auto ref = std::get<shared_ptr<WaitingRef>>(identity)->check())
if (auto id = Identity::load(*ref)) {
identity.emplace<Identity>(*id);
if (lpeer)
- lpeer->notifyWatchers();
+ notifyPeers.push_back(lpeer);
}
}
else if (holds_alternative<Identity>(identity)) {
@@ -684,7 +747,7 @@ void Server::Peer::updateIdentity(ReplyBuilder &)
if (nid != get<Identity>(identity)) {
identity = move(nid);
if (lpeer)
- lpeer->notifyWatchers();
+ notifyPeers.push_back(lpeer);
}
}
}
@@ -742,32 +805,18 @@ void Server::Peer::finalizeChannel(ReplyBuilder & reply, unique_ptr<Channel> ch)
reply.header(NetworkProtocol::Header::AnnounceUpdate { r.digest() });
}
-void Server::Peer::updateService(ReplyBuilder & reply)
+void Server::Peer::updateService(ReplyBuilder & reply, vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> & readyServices)
{
decltype(serviceQueue) next;
for (auto & x : serviceQueue) {
if (auto ref = std::get<1>(x)->check(reply)) {
if (lpeer) {
- Service::Context ctx { nullptr };
-
- server.localHead.update([&] (const Stored<LocalState> & local) {
- ctx = Service::Context(new Service::Context::Priv {
- .ref = *ref,
- .peer = erebos::Peer(lpeer),
- .local = local,
- });
-
- for (auto & svc : server.services) {
- if (svc->uuid() == std::get<UUID>(x)) {
- svc->handle(ctx);
- break;
- }
+ for (auto & svc : server.services) {
+ if (svc->uuid() == std::get<UUID>(x)) {
+ readyServices.emplace_back(lpeer, *svc, *ref);
+ break;
}
-
- return ctx.local();
- });
-
- ctx.runAfterCommitHooks();
+ }
}
} else {
next.push_back(std::move(x));
diff --git a/src/network.h b/src/network.h
index d1fae15..12ec4e1 100644
--- a/src/network.h
+++ b/src/network.h
@@ -57,10 +57,10 @@ struct Server::Peer
shared_ptr<erebos::Peer::Priv> lpeer = nullptr;
- void updateIdentity(ReplyBuilder &);
+ void updateIdentity(ReplyBuilder &, vector<shared_ptr<erebos::Peer::Priv>> & notifyPeers);
void updateChannel(ReplyBuilder &);
void finalizeChannel(ReplyBuilder &, unique_ptr<Channel>);
- void updateService(ReplyBuilder &);
+ void updateService(ReplyBuilder &, vector<tuple<shared_ptr<erebos::Peer::Priv>, Service &, Ref>> & readyServices);
};
struct Peer::Priv : enable_shared_from_this<Peer::Priv>
@@ -70,6 +70,7 @@ struct Peer::Priv : enable_shared_from_this<Peer::Priv>
size_t listIndex;
void notifyWatchers();
+ void runServicesHandler(Service & service, Ref ref);
};
struct PeerList::Priv : enable_shared_from_this<PeerList::Priv>