diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 6ea2b5549..6b1a8d245 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -179,7 +179,7 @@ get_sys_memory() -> end. node_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)). stopped_node_info(Node) -> {Node, #{node => Node, node_status => 'stopped', role => core}}. @@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) -> M#{K => iolist_to_binary(V)}. broker_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -330,7 +330,7 @@ kickout_client(Node, ClientId) -> kickout_clients(ClientIds) when is_list(ClientIds) -> F = fun(Node) -> - emqx_management_proto_v4:kickout_clients(Node, ClientIds) + emqx_management_proto_v5:kickout_clients(Node, ClientIds) end, Results = lists:map(F, emqx:running_nodes()), case lists:filter(fun(Res) -> Res =/= ok end, Results) of @@ -446,7 +446,7 @@ do_call_client(ClientId, Req) -> %% @private call_client(Node, ClientId, Req) -> - unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)). + unwrap_rpc(emqx_management_proto_v5:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -459,7 +459,7 @@ do_list_subscriptions() -> throw(not_implemented). list_subscriptions(Node) -> - unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)). + unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) -> subscribe(emqx:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of + case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of + case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. unsubscribe_batch([Node | Nodes], ClientId, Topics) -> - case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of + case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); Re -> Re end; diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index f2e336d0f..7c5a9ffd9 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -350,7 +350,7 @@ configs(put, #{body := Conf, query_string := #{<<"mode">> := Mode}}, _Req) -> {error, Errors} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Errors)}} end. -find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Perferences) > 0 -> +find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Preferences) > 0 -> AcceptVal = maps:get(<<"accept">>, Headers, <<"*/*">>), %% Multiple types, weighted with the quality value syntax: %% Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8 @@ -363,11 +363,11 @@ find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Per ), case lists:member(<<"*/*">>, Accepts) of true -> - {ok, lists:nth(1, Perferences)}; + {ok, lists:nth(1, Preferences)}; false -> - Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Perferences), + Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Preferences), case Found of - [] -> {error, no_suitalbe_accept}; + [] -> {error, no_suitable_accept}; _ -> {ok, lists:nth(1, Found)} end end. @@ -376,7 +376,7 @@ get_configs_v1(QueryStr) -> Node = maps:get(<<"node">>, QueryStr, node()), case lists:member(Node, emqx:running_nodes()) andalso - emqx_management_proto_v2:get_full_config(Node) + emqx_management_proto_v5:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), @@ -389,10 +389,13 @@ get_configs_v1(QueryStr) -> end. get_configs_v2(QueryStr) -> + Node = maps:get(<<"node">>, QueryStr, node()), Conf = case maps:find(<<"key">>, QueryStr) of - error -> emqx_conf_cli:get_config(); - {ok, Key} -> emqx_conf_cli:get_config(atom_to_binary(Key)) + error -> + emqx_management_proto_v5:get_full_config_v2(Node); + {ok, Key} -> + emqx_management_proto_v5:get_config_v2(Node, atom_to_binary(Key)) end, { 200, diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 719d0913d..3db52b08c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -515,7 +515,7 @@ list_listeners() -> lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]). list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v2:list_listeners(Node)). + wrap_rpc(emqx_management_proto_v5:list_listeners(Node)). listener_status_by_id(NodeL) -> Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v5.erl b/apps/emqx_management/src/proto/emqx_management_proto_v5.erl new file mode 100644 index 000000000..e0a2ec6e2 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v5.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v5). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1, + get_full_config_v2/1, + get_config_v2/2, + + kickout_clients/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.1". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()). +node_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). + +-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()). +broker_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). + +-spec get_full_config_v2(node()) -> map() | {badrpc, _}. +get_full_config_v2(Node) -> + rpc:call(Node, emqx_conf_cli, get_config, []). + +-spec get_config_v2(node(), binary()) -> map() | {badrpc, _}. +get_config_v2(Node, Key) -> + rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 43554c9ff..99048bdfd 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -273,7 +273,7 @@ t_configs_node({'init', Config}) -> Node = node(), meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end), meck:expect( - emqx_management_proto_v2, + emqx_management_proto_v5, get_full_config, fun (Node0) when Node0 =:= Node -> <<"\"self\"">>; @@ -283,7 +283,7 @@ t_configs_node({'init', Config}) -> ), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v2]); + meck:unload([emqx, emqx_management_proto_v5]); t_configs_node(_) -> Node = atom_to_list(node()),