From 0b329dbf063c1e67bdb520a8050096740532c495 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jun 2024 09:50:04 -0300 Subject: [PATCH] perf(mgmt): optimize bulk subscribe when registry is enabled --- apps/emqx_management/src/emqx_mgmt.erl | 34 ++++++++++++++++++++------ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 1ea88e9e5..afb88692e 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -567,7 +567,18 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(emqx:running_nodes(), ClientId, TopicTables). + case emqx_cm_registry:is_enabled() of + false -> + subscribe(emqx:running_nodes(), ClientId, TopicTables); + true -> + with_client_node( + ClientId, + {error, channel_not_found}, + fun(Node) -> + subscribe([Node], ClientId, TopicTables) + end + ) + end. subscribe([Node | Nodes], ClientId, TopicTables) -> case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of @@ -698,19 +709,26 @@ lookup_running_client(ClientId, FormatFun) -> || Node <- emqx:running_nodes() ]); true -> - case emqx_cm_registry:lookup_channels(ClientId) of - [ChanPid | _] -> - Node = node(ChanPid), - lookup_client(Node, {clientid, ClientId}, FormatFun); - [] -> - [] - end + with_client_node( + ClientId, + _WhenNotFound = [], + fun(Node) -> lookup_client(Node, {clientid, ClientId}, FormatFun) end + ) end. %%-------------------------------------------------------------------- %% Internal Functions. %%-------------------------------------------------------------------- +with_client_node(ClientId, WhenNotFound, Fn) -> + case emqx_cm_registry:lookup_channels(ClientId) of + [ChanPid | _] -> + Node = node(ChanPid), + Fn(Node); + [] -> + WhenNotFound + end. + unwrap_rpc({badrpc, Reason}) -> {error, Reason}; unwrap_rpc(Res) ->