From ea95f792e086edd8e19ac6fa937bb7e6b00d911f Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 25 Oct 2022 07:08:30 +0800 Subject: [PATCH] refactor: multicall node_info to improve /node speed --- apps/emqx/src/emqx_rpc.erl | 3 + apps/emqx_management/src/emqx_mgmt.erl | 63 ++++++++------- .../src/proto/emqx_management_proto_v3.erl | 80 +++++++++++++++++++ .../test/emqx_mgmt_api_nodes_SUITE.erl | 52 ++++++++++++ 4 files changed, 168 insertions(+), 30 deletions(-) create mode 100644 apps/emqx_management/src/proto/emqx_management_proto_v3.erl diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 939b5395d..ec8fd83de 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -125,6 +125,9 @@ max_client_num() -> emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum). -spec unwrap_erpc(emqx_rpc:erpc(A)) -> A | {error, _Err}. + +unwrap_erpc(Res) when is_list(Res) -> + [unwrap_erpc(R) || R <- Res]; unwrap_erpc({ok, A}) -> A; unwrap_erpc({throw, A}) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 8f73d5767..23804614b 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -118,9 +118,11 @@ list_nodes() -> Running = mria_mnesia:cluster_nodes(running), Stopped = mria_mnesia:cluster_nodes(stopped), DownNodes = lists:map(fun stopped_node_info/1, Stopped), - [{Node, node_info(Node)} || Node <- Running] ++ DownNodes. + [{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes. -lookup_node(Node) -> node_info(Node). +lookup_node(Node) -> + [Info] = node_info([Node]), + Info. node_info() -> {UsedRatio, Total} = get_sys_memory(), @@ -152,8 +154,8 @@ get_sys_memory() -> {0, 0} end. -node_info(Node) -> - wrap_rpc(emqx_management_proto_v2:node_info(Node)). +node_info(Nodes) -> + emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)). stopped_node_info(Node) -> #{name => Node, node_status => 'stopped'}. @@ -163,17 +165,19 @@ stopped_node_info(Node) -> %%-------------------------------------------------------------------- list_brokers() -> - [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()]. + Running = mria_mnesia:running_nodes(), + [{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)]. lookup_broker(Node) -> - broker_info(Node). + [Broker] = broker_info([Node]), + Broker. broker_info() -> Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]), Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}. -broker_info(Node) -> - wrap_rpc(emqx_management_proto_v2:broker_info(Node)). +broker_info(Nodes) -> + emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -183,7 +187,7 @@ get_metrics() -> nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). get_metrics(Node) -> - wrap_rpc(emqx_proto_v1:get_metrics(Node)). + unwrap_rpc(emqx_proto_v1:get_metrics(Node)). get_stats() -> GlobalStatsKeys = @@ -211,7 +215,7 @@ delete_keys(List, [Key | Keys]) -> delete_keys(proplists:delete(Key, List), Keys). get_stats(Node) -> - wrap_rpc(emqx_proto_v1:get_stats(Node)). + unwrap_rpc(emqx_proto_v1:get_stats(Node)). nodes_info_count(PropList) -> NodeCount = @@ -241,7 +245,7 @@ lookup_client({username, Username}, FormatFun) -> ]). lookup_client(Node, Key, {M, F}) -> - case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of + case unwrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of {error, Err} -> {error, Err}; L -> @@ -264,7 +268,7 @@ kickout_client({ClientID, FormatFun}) -> end. kickout_client(Node, ClientId) -> - wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). + unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). @@ -284,14 +288,14 @@ list_client_subscriptions(ClientId) -> end. client_subscriptions(Node, ClientId) -> - {Node, wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. + {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. clean_authz_cache(ClientId) -> Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], check_results(Results). clean_authz_cache(Node, ClientId) -> - wrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)). + unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)). clean_authz_cache_all() -> Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], @@ -308,10 +312,10 @@ wrap_results(Results) -> end. clean_authz_cache_all(Node) -> - wrap_rpc(emqx_proto_v1:clean_authz_cache(Node)). + unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node)). clean_pem_cache_all(Node) -> - wrap_rpc(emqx_proto_v1:clean_pem_cache(Node)). + unwrap_rpc(emqx_proto_v1:clean_pem_cache(Node)). set_ratelimit_policy(ClientId, Policy) -> call_client(ClientId, {ratelimit, Policy}). @@ -357,7 +361,7 @@ do_call_client(ClientId, Req) -> %% @private call_client(Node, ClientId, Req) -> - wrap_rpc(emqx_management_proto_v2:call_client(Node, ClientId, Req)). + unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -376,7 +380,7 @@ do_list_subscriptions() -> end. list_subscriptions(Node) -> - wrap_rpc(emqx_management_proto_v2:list_subscriptions(Node)). + unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -385,7 +389,7 @@ list_subscriptions_via_topic(Topic, FormatFun) -> ]). list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> - case wrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of + case unwrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of {error, Reason} -> {error, Reason}; Result -> M:F(Result) end. @@ -394,7 +398,7 @@ lookup_subscriptions(ClientId) -> lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). lookup_subscriptions(Node, ClientId) -> - wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)). + unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)). %%-------------------------------------------------------------------- %% PubSub @@ -404,7 +408,7 @@ subscribe(ClientId, TopicTables) -> subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case wrap_rpc(emqx_management_proto_v2:subscribe(Node, ClientId, TopicTables)) of + case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -431,7 +435,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case wrap_rpc(emqx_management_proto_v2:unsubscribe(Node, ClientId, Topic)) of + case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -454,7 +458,7 @@ unsubscribe_batch(ClientId, Topics) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. unsubscribe_batch([Node | Nodes], ClientId, Topics) -> - case wrap_rpc(emqx_management_proto_v2:unsubscribe_batch(Node, ClientId, Topics)) of + case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); Re -> Re end; @@ -477,16 +481,16 @@ get_alarms(Type) -> [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()]. get_alarms(Node, Type) -> - add_duration_field(wrap_rpc(emqx_proto_v1:get_alarms(Node, Type))). + add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))). deactivate(Node, Name) -> - wrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)). + unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)). delete_all_deactivated_alarms() -> [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()]. delete_all_deactivated_alarms(Node) -> - wrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)). + unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)). add_duration_field(Alarms) -> Now = erlang:system_time(microsecond), @@ -523,10 +527,9 @@ delete_banned(Who) -> %%-------------------------------------------------------------------- %% Internal Functions. %%-------------------------------------------------------------------- - -wrap_rpc({badrpc, Reason}) -> +unwrap_rpc({badrpc, Reason}) -> {error, Reason}; -wrap_rpc(Res) -> +unwrap_rpc(Res) -> Res. otp_rel() -> @@ -546,7 +549,7 @@ check_row_limit([Tab | Tables], Limit) -> check_results(Results) -> case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; - false -> wrap_rpc(lists:last(Results)) + false -> unwrap_rpc(lists:last(Results)) end. max_row_limit() -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl new file mode 100644 index 000000000..9c9b71012 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl @@ -0,0 +1,80 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.9". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info([node()]) -> emqx_rpc:multicall_result(). +node_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). + +-spec broker_info([node()]) -> emqx_rpc:multicall_result(). +broker_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). diff --git a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl index 4fb512ed0..197e338fd 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl @@ -113,3 +113,55 @@ t_node_metrics_api(_) -> {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(get, BadNodePath) ). + +t_multiple_nodes_api(_) -> + net_kernel:start(['node_api@127.0.0.1', longnames]), + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([core, core]), + ct:pal("Starting ~p", [Cluster]), + Node1 = emqx_common_test_helpers:start_slave(Name, Opts), + Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), + try + NodesPath = emqx_mgmt_api_test_util:api_path(["nodes"]), + {ok, Nodes} = emqx_mgmt_api_test_util:request_api(get, NodesPath), + NodesResponse = emqx_json:decode(Nodes, [return_maps]), + All = [Node1, Node2, node()], + lists:map( + fun(N) -> + N1 = binary_to_atom(maps:get(<<"node">>, N), utf8), + ?assertEqual(true, lists:member(N1, All)) + end, + NodesResponse + ), + ?assertEqual(3, length(NodesResponse)), + + NodePath = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_list(Node1)]), + {ok, NodeInfo} = emqx_mgmt_api_test_util:request_api(get, NodePath), + NodeNameResponse = + binary_to_atom(maps:get(<<"node">>, emqx_json:decode(NodeInfo, [return_maps])), utf8), + ?assertEqual(Node1, NodeNameResponse) + after + emqx_common_test_helpers:stop_slave(Node1), + emqx_common_test_helpers:stop_slave(Node2) + end, + ok. + +cluster(Specs) -> + Env = [ + {emqx, init_config_load_done, false}, + {emqx, boot_modules, []} + ], + emqx_common_test_helpers:emqx_cluster(Specs, [ + {env, Env}, + {apps, [emqx_conf]}, + {load_schema, false}, + {join_to, true}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, []), + ok; + (_) -> + ok + end} + ]).