From 402f75592e18495e9c73ba51990e92af4a453c1b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 17 Jul 2023 09:45:38 +0800 Subject: [PATCH] chore: move rpc call from emqx_management_proto to emqx_conf_proto --- apps/emqx/priv/bpapi.versions | 2 +- apps/emqx_conf/src/emqx_conf.erl | 18 +-- apps/emqx_conf/src/emqx_conf_app.erl | 4 +- .../src/proto/emqx_conf_proto_v3.erl | 119 ++++++++++++++++++ apps/emqx_management/src/emqx_mgmt.erl | 16 +-- .../src/emqx_mgmt_api_configs.erl | 7 +- .../src/emqx_mgmt_api_listeners.erl | 2 +- .../src/proto/emqx_management_proto_v5.erl | 96 -------------- .../test/emqx_mgmt_api_configs_SUITE.erl | 4 +- 9 files changed, 146 insertions(+), 122 deletions(-) create mode 100644 apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl delete mode 100644 apps/emqx_management/src/proto/emqx_management_proto_v5.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 21a8bec54..e13f60654 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -12,6 +12,7 @@ {emqx_cm,2}. {emqx_conf,1}. {emqx_conf,2}. +{emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_eviction_agent,1}. @@ -28,7 +29,6 @@ {emqx_management,2}. {emqx_management,3}. {emqx_management,4}. -{emqx_management,5}. {emqx_metrics,1}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index d5f39bcb0..1efeb4d69 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -71,7 +71,7 @@ get_raw(KeyPath) -> %% @doc Returns all values in the cluster. -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> - {ResL, []} = emqx_conf_proto_v2:get_all(KeyPath), + {ResL, []} = emqx_conf_proto_v3:get_all(KeyPath), maps:from_list(ResL). %% @doc Returns the specified node's KeyPath, or exception if not found @@ -79,14 +79,14 @@ get_all(KeyPath) -> get_by_node(Node, KeyPath) when Node =:= node() -> emqx:get_config(KeyPath); get_by_node(Node, KeyPath) -> - emqx_conf_proto_v2:get_config(Node, KeyPath). + emqx_conf_proto_v3:get_config(Node, KeyPath). %% @doc Returns the specified node's KeyPath, or the default value if not found -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term(). get_by_node(Node, KeyPath, Default) when Node =:= node() -> emqx:get_config(KeyPath, Default); get_by_node(Node, KeyPath, Default) -> - emqx_conf_proto_v2:get_config(Node, KeyPath, Default). + emqx_conf_proto_v3:get_config(Node, KeyPath, Default). %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term(). @@ -101,7 +101,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts). + emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -114,7 +114,7 @@ update(KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); update(Node, KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts). + emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts). %% @doc Mark the specified key path as tombstone tombstone(KeyPath, Opts) -> @@ -124,7 +124,7 @@ tombstone(KeyPath, Opts) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - emqx_conf_proto_v2:remove_config(KeyPath, Opts). + emqx_conf_proto_v3:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -132,13 +132,13 @@ remove(KeyPath, Opts) -> remove(Node, KeyPath, Opts) when Node =:= node() -> emqx:remove_config(KeyPath, Opts#{override_to => local}); remove(Node, KeyPath, Opts) -> - emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts). + emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts). %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - emqx_conf_proto_v2:reset(KeyPath, Opts). + emqx_conf_proto_v3:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -146,7 +146,7 @@ reset(KeyPath, Opts) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - emqx_conf_proto_v2:reset(Node, KeyPath, Opts). + emqx_conf_proto_v3:reset(Node, KeyPath, Opts). %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 0a486c829..7addb3823 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -137,7 +137,7 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> - {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), + {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), case (Failed =/= [] orelse NotReady =/= []) of @@ -284,7 +284,7 @@ conf_sort({ok, _}, {ok, _}) -> false. sync_data_from_node(Node) -> - case emqx_conf_proto_v2:sync_data_from_node(Node) of + case emqx_conf_proto_v3:sync_data_from_node(Node) of {ok, DataBin} -> case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of {ok, []} -> diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl new file mode 100644 index 000000000..a2719bc8e --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -0,0 +1,119 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + sync_data_from_node/1, + get_config/2, + get_config/3, + get_all/1, + + update/3, + update/4, + remove_config/2, + remove_config/3, + + reset/2, + reset/3, + + get_override_config_file/1 +]). + +-export([get_hocon_config/1, get_hocon_config/2]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.1". + +-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +sync_data_from_node(Node) -> + rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). +-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. + +-spec get_config(node(), emqx_utils_maps:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update( + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update( + node(), + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). + +-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). +get_override_config_file(Nodes) -> + rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). + +-spec get_hocon_config(node()) -> map() | {badrpc, _}. +get_hocon_config(Node) -> + rpc:call(Node, emqx_conf_cli, get_config, []). + +-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. +get_hocon_config(Node, Key) -> + rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 6b1a8d245..2f261c0d5 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -179,7 +179,7 @@ get_sys_memory() -> end. node_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)). stopped_node_info(Node) -> {Node, #{node => Node, node_status => 'stopped', role => core}}. @@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) -> M#{K => iolist_to_binary(V)}. broker_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -330,7 +330,7 @@ kickout_client(Node, ClientId) -> kickout_clients(ClientIds) when is_list(ClientIds) -> F = fun(Node) -> - emqx_management_proto_v5:kickout_clients(Node, ClientIds) + emqx_management_proto_v4:kickout_clients(Node, ClientIds) end, Results = lists:map(F, emqx:running_nodes()), case lists:filter(fun(Res) -> Res =/= ok end, Results) of @@ -446,7 +446,7 @@ do_call_client(ClientId, Req) -> %% @private call_client(Node, ClientId, Req) -> - unwrap_rpc(emqx_management_proto_v5:call_client(Node, ClientId, Req)). + unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -459,7 +459,7 @@ do_list_subscriptions() -> throw(not_implemented). list_subscriptions(Node) -> - unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)). + unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) -> subscribe(emqx:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of + case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of + case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. unsubscribe_batch([Node | Nodes], ClientId, Topics) -> - case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of + case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); Re -> Re end; diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 7c5a9ffd9..b644a7dae 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -372,11 +372,12 @@ find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Pre end end. +%% Should deprecated json v1 since 5.2.0 get_configs_v1(QueryStr) -> Node = maps:get(<<"node">>, QueryStr, node()), case lists:member(Node, emqx:running_nodes()) andalso - emqx_management_proto_v5:get_full_config(Node) + emqx_management_proto_v4:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), @@ -393,9 +394,9 @@ get_configs_v2(QueryStr) -> Conf = case maps:find(<<"key">>, QueryStr) of error -> - emqx_management_proto_v5:get_full_config_v2(Node); + emqx_conf_proto_v3:get_hocon_config(Node); {ok, Key} -> - emqx_management_proto_v5:get_config_v2(Node, atom_to_binary(Key)) + emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key)) end, { 200, diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 3db52b08c..90fb1f98e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -515,7 +515,7 @@ list_listeners() -> lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]). list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v5:list_listeners(Node)). + wrap_rpc(emqx_management_proto_v4:list_listeners(Node)). listener_status_by_id(NodeL) -> Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v5.erl b/apps/emqx_management/src/proto/emqx_management_proto_v5.erl deleted file mode 100644 index e0a2ec6e2..000000000 --- a/apps/emqx_management/src/proto/emqx_management_proto_v5.erl +++ /dev/null @@ -1,96 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_management_proto_v5). - --behaviour(emqx_bpapi). - --export([ - introduced_in/0, - - node_info/1, - broker_info/1, - list_subscriptions/1, - - list_listeners/1, - subscribe/3, - unsubscribe/3, - unsubscribe_batch/3, - - call_client/3, - - get_full_config/1, - get_full_config_v2/1, - get_config_v2/2, - - kickout_clients/2 -]). - --include_lib("emqx/include/bpapi.hrl"). - -introduced_in() -> - "5.1.1". - --spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> - {unsubscribe, _} | {error, _} | {badrpc, _}. -unsubscribe_batch(Node, ClientId, Topics) -> - rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). - --spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()). -node_info(Nodes) -> - erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). - --spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()). -broker_info(Nodes) -> - erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). - --spec list_subscriptions(node()) -> [map()] | {badrpc, _}. -list_subscriptions(Node) -> - rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). - --spec list_listeners(node()) -> map() | {badrpc, _}. -list_listeners(Node) -> - rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). - --spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> - {subscribe, _} | {error, atom()} | {badrpc, _}. -subscribe(Node, ClientId, TopicTables) -> - rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). - --spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> - {unsubscribe, _} | {error, _} | {badrpc, _}. -unsubscribe(Node, ClientId, Topic) -> - rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). - --spec call_client(node(), emqx_types:clientid(), term()) -> term(). -call_client(Node, ClientId, Req) -> - rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). - --spec get_full_config(node()) -> map() | list() | {badrpc, _}. -get_full_config(Node) -> - rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). - --spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. -kickout_clients(Node, ClientIds) -> - rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). - --spec get_full_config_v2(node()) -> map() | {badrpc, _}. -get_full_config_v2(Node) -> - rpc:call(Node, emqx_conf_cli, get_config, []). - --spec get_config_v2(node(), binary()) -> map() | {badrpc, _}. -get_config_v2(Node, Key) -> - rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 99048bdfd..f853adebf 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -273,7 +273,7 @@ t_configs_node({'init', Config}) -> Node = node(), meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end), meck:expect( - emqx_management_proto_v5, + emqx_management_proto_v4, get_full_config, fun (Node0) when Node0 =:= Node -> <<"\"self\"">>; @@ -283,7 +283,7 @@ t_configs_node({'init', Config}) -> ), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v5]); + meck:unload([emqx, emqx_management_proto_v4]); t_configs_node(_) -> Node = atom_to_list(node()),