diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 36c5f247e..11ac8f582 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -39,6 +39,7 @@ {emqx_management,2}. {emqx_management,3}. {emqx_management,4}. +{emqx_management,5}. {emqx_metrics,1}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 992905e48..9428ef8e2 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -18,6 +18,7 @@ -include("emqx_mgmt.hrl"). -include_lib("emqx/include/emqx_cm.hrl"). +-include_lib("emqx/include/logger.hrl"). -elvis([{elvis_style, invalid_dynamic_call, disable}]). -elvis([{elvis_style, god_modules, disable}]). @@ -117,6 +118,13 @@ -elvis([{elvis_style, god_modules, disable}]). +-define(maybe_log_node_errors(LogData, Errors), + case Errors of + [] -> ok; + _ -> ?SLOG(error, (LogData)#{node_errors => Errors}) + end +). + %%-------------------------------------------------------------------- %% Node Info %%-------------------------------------------------------------------- @@ -185,7 +193,7 @@ get_sys_memory() -> end. node_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)). stopped_node_info(Node) -> {Node, #{node => Node, node_status => 'stopped', role => core}}. @@ -248,7 +256,7 @@ convert_broker_info({K, V}, M) -> M#{K => iolist_to_binary(V)}. broker_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -361,7 +369,7 @@ kickout_client(Node, ClientId) -> kickout_clients(ClientIds) when is_list(ClientIds) -> F = fun(Node) -> - emqx_management_proto_v4:kickout_clients(Node, ClientIds) + emqx_management_proto_v5:kickout_clients(Node, ClientIds) end, Results = lists:map(F, emqx:running_nodes()), case lists:filter(fun(Res) -> Res =/= ok end, Results) of @@ -461,17 +469,34 @@ set_keepalive(_ClientId, _Interval) -> %% @private call_client(ClientId, Req) -> - Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()], - Expected = lists:filter( + case emqx_cm_registry:is_enabled() of + true -> + do_call_client(ClientId, Req); + false -> + call_client_on_all_nodes(ClientId, Req) + end. + +call_client_on_all_nodes(ClientId, Req) -> + Nodes = emqx:running_nodes(), + Results = call_client(Nodes, ClientId, Req), + {Expected, Errs} = lists:foldr( fun - ({error, _}) -> false; - (_) -> true + ({_N, {error, not_found}}, Acc) -> Acc; + ({_N, {error, _}} = Err, {OkAcc, ErrAcc}) -> {OkAcc, [Err | ErrAcc]}; + ({_N, OkRes}, {OkAcc, ErrAcc}) -> {[OkRes | OkAcc], ErrAcc} end, - Results + {[], []}, + lists:zip(Nodes, Results) ), + ?maybe_log_node_errors(#{msg => "call_client_failed", request => Req}, Errs), case Expected of - [] -> {error, not_found}; - [Result | _] -> Result + [] -> + case Errs of + [] -> {error, not_found}; + [{_Node, FirstErr} | _] -> FirstErr + end; + [Result | _] -> + Result end. %% @private @@ -491,8 +516,8 @@ do_call_client(ClientId, Req) -> end. %% @private -call_client(Node, ClientId, Req) -> - unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)). +call_client(Nodes, ClientId, Req) -> + emqx_rpc:unwrap_erpc(emqx_management_proto_v5:call_client(Nodes, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -505,7 +530,7 @@ do_list_subscriptions() -> throw(not_implemented). list_subscriptions(Node) -> - unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)). + unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -527,7 +552,7 @@ subscribe(ClientId, TopicTables) -> subscribe(emqx:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of + case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -554,7 +579,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of + case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -577,7 +602,7 @@ unsubscribe_batch(ClientId, Topics) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. unsubscribe_batch([Node | Nodes], ClientId, Topics) -> - case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of + case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); Re -> Re end; @@ -656,6 +681,7 @@ lookup_running_client(ClientId, FormatFun) -> %%-------------------------------------------------------------------- %% Internal Functions. %%-------------------------------------------------------------------- + unwrap_rpc({badrpc, Reason}) -> {error, Reason}; unwrap_rpc(Res) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index a886d716f..f013dfcd1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -407,7 +407,7 @@ get_configs_v1(QueryStr) -> Node = maps:get(<<"node">>, QueryStr, node()), case lists:member(Node, emqx:running_nodes()) andalso - emqx_management_proto_v4:get_full_config(Node) + emqx_management_proto_v5:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 2d2982b51..1c581514a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -516,7 +516,7 @@ list_listeners() -> lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]). list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v4:list_listeners(Node)). + wrap_rpc(emqx_management_proto_v5:list_listeners(Node)). listener_status_by_id(NodeL) -> Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v5.erl b/apps/emqx_management/src/proto/emqx_management_proto_v5.erl new file mode 100644 index 000000000..eeaa6be02 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v5.erl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v5). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1, + + kickout_clients/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.6.0". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()). +node_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). + +-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()). +broker_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client([node()], emqx_types:clientid(), term()) -> emqx_rpc:erpc_multicall(term()). +call_client(Nodes, ClientId, Req) -> + erpc:multicall(Nodes, emqx_mgmt, do_call_client, [ClientId, Req], 30000). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 9ce737353..86237c17b 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -28,14 +28,19 @@ all() -> [ {group, persistence_disabled}, - {group, persistence_enabled} + {group, persistence_enabled}, + {group, cm_registry_enabled}, + {group, cm_registry_disabled} ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + CMRegistryTCs = [t_call_client_cluster], + TCs = emqx_common_test_helpers:all(?MODULE) -- CMRegistryTCs, [ {persistence_disabled, [], TCs}, - {persistence_enabled, [], [t_persist_list_subs]} + {persistence_enabled, [], [t_persist_list_subs]}, + {cm_registry_enabled, CMRegistryTCs}, + {cm_registry_disabled, CMRegistryTCs} ]. init_per_group(persistence_disabled, Config) -> @@ -66,10 +71,17 @@ init_per_group(persistence_enabled, Config) -> [ {apps, Apps} | Config - ]. + ]; +init_per_group(cm_registry_enabled, Config) -> + [{emqx_config, "broker.enable_session_registry = true"} | Config]; +init_per_group(cm_registry_disabled, Config) -> + [{emqx_config, "broker.enable_session_registry = false"} | Config]. end_per_group(_Grp, Config) -> - emqx_cth_suite:stop(?config(apps, Config)). + case ?config(apps, Config) of + undefined -> ok; + Apps -> emqx_cth_suite:stop(Apps) + end. init_per_suite(Config) -> Config. @@ -447,6 +459,83 @@ t_persist_list_subs(_) -> %% clients: VerifySubs(). +t_call_client_cluster(Config) -> + [Node1, Node2] = ?config(cluster, Config), + [Node1ClientId, Node2ClientId] = ?config(client_ids, Config), + ?assertMatch( + {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId)) + ), + + case proplists:get_value(name, ?config(tc_group_properties, Config)) of + cm_registry_disabled -> + %% Simulating crashes that must be handled by erpc multicall + ?assertMatch( + {error, _}, + rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId)) + ), + ?assertMatch( + {error, _}, + rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId)) + ); + cm_registry_enabled -> + %% Direct call to remote pid is expected to crash + ?assertMatch( + {badrpc, {'EXIT', _}}, + rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId)) + ), + ?assertMatch( + {badrpc, {'EXIT', _}}, + rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId)) + ); + _ -> + ok + end, + + NotFoundClientId = <<"no_such_client_id">>, + ?assertEqual( + {error, not_found}, + rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId)) + ), + ?assertEqual( + {error, not_found}, + rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId)) + ). + +t_call_client_cluster(init, Config) -> + Apps = [{emqx, ?config(emqx_config, Config)}, emqx_management], + [Node1, Node2] = + Cluster = emqx_cth_cluster:start( + [ + {list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}}, + {list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + {ok, Node1Client, Node1ClientId} = connect_client(Node1), + {ok, Node2Client, Node2ClientId} = connect_client(Node2), + %% They may exit during the test due to simulated crashes + unlink(Node1Client), + unlink(Node2Client), + [ + {cluster, Cluster}, + {client_ids, [Node1ClientId, Node2ClientId]}, + {client_pids, [Node1Client, Node2Client]} + | Config + ]; +t_call_client_cluster('end', Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + [exit(ClientPid, kill) || ClientPid <- ?config(client_pids, Config)], + ok. + %%% helpers ident(Arg) -> Arg. @@ -462,3 +551,24 @@ setup_clients(Config) -> disconnect_clients(Config) -> Clients = ?config(clients, Config), lists:foreach(fun emqtt:disconnect/1, Clients). + +get_mqtt_port(Node) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), + Port. + +connect_client(Node) -> + Port = get_mqtt_port(Node), + ClientId = <<(atom_to_binary(Node))/binary, "_client">>, + {ok, Client} = emqtt:start_link([ + {port, Port}, + {proto_ver, v5}, + {clientid, ClientId} + ]), + {ok, _} = emqtt:connect(Client), + {ok, Client, ClientId}. + +client_msgs_args(ClientId) -> + [mqueue_msgs, ClientId, #{limit => 10, continuation => none}]. + +client_msgs_bad_args(ClientId) -> + [mqueue_msgs, ClientId, "bad_page_params"]. diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index e2302be93..2c90c9dac 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -287,12 +287,12 @@ t_configs_node({'init', Config}) -> (other_node, _) -> <<"log=2">>; (bad_node, _) -> {badrpc, bad} end, - meck:expect(emqx_management_proto_v4, get_full_config, F), + meck:expect(emqx_management_proto_v5, get_full_config, F), meck:expect(emqx_conf_proto_v3, get_hocon_config, F2), meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]); + meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]); t_configs_node(_) -> Node = atom_to_list(node()),