perf(mgmt): optimize bulk subscribe when registry is enabled

This commit is contained in:
Thales Macedo Garitezi 2024-06-27 09:50:04 -03:00
parent c49900af50
commit 0b329dbf06
1 changed files with 26 additions and 8 deletions

View File

@ -567,7 +567,18 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
subscribe(ClientId, TopicTables) -> subscribe(ClientId, TopicTables) ->
subscribe(emqx:running_nodes(), ClientId, TopicTables). case emqx_cm_registry:is_enabled() of
false ->
subscribe(emqx:running_nodes(), ClientId, TopicTables);
true ->
with_client_node(
ClientId,
{error, channel_not_found},
fun(Node) ->
subscribe([Node], ClientId, TopicTables)
end
)
end.
subscribe([Node | Nodes], ClientId, TopicTables) -> subscribe([Node | Nodes], ClientId, TopicTables) ->
case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
@ -698,19 +709,26 @@ lookup_running_client(ClientId, FormatFun) ->
|| Node <- emqx:running_nodes() || Node <- emqx:running_nodes()
]); ]);
true -> true ->
case emqx_cm_registry:lookup_channels(ClientId) of with_client_node(
[ChanPid | _] -> ClientId,
Node = node(ChanPid), _WhenNotFound = [],
lookup_client(Node, {clientid, ClientId}, FormatFun); fun(Node) -> lookup_client(Node, {clientid, ClientId}, FormatFun) end
[] -> )
[]
end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions. %% Internal Functions.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_client_node(ClientId, WhenNotFound, Fn) ->
case emqx_cm_registry:lookup_channels(ClientId) of
[ChanPid | _] ->
Node = node(ChanPid),
Fn(Node);
[] ->
WhenNotFound
end.
unwrap_rpc({badrpc, Reason}) -> unwrap_rpc({badrpc, Reason}) ->
{error, Reason}; {error, Reason};
unwrap_rpc(Res) -> unwrap_rpc(Res) ->