Merge pull request #11274 from zhongwencool/fix-get-config-from-target-node

fix: get_config return target node's node conf
This commit is contained in:
JianBo He 2023-07-19 16:01:13 +08:00 committed by GitHub
commit 89b5cda2a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 181 additions and 40 deletions

View File

@ -12,6 +12,7 @@
{emqx_cm,2}.
{emqx_conf,1}.
{emqx_conf,2}.
{emqx_conf,3}.
{emqx_dashboard,1}.
{emqx_delayed,1}.
{emqx_eviction_agent,1}.

View File

@ -71,7 +71,7 @@ get_raw(KeyPath) ->
%% @doc Returns all values in the cluster.
-spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
get_all(KeyPath) ->
{ResL, []} = emqx_conf_proto_v2:get_all(KeyPath),
{ResL, []} = emqx_conf_proto_v3:get_all(KeyPath),
maps:from_list(ResL).
%% @doc Returns the specified node's KeyPath, or exception if not found
@ -79,14 +79,14 @@ get_all(KeyPath) ->
get_by_node(Node, KeyPath) when Node =:= node() ->
emqx:get_config(KeyPath);
get_by_node(Node, KeyPath) ->
emqx_conf_proto_v2:get_config(Node, KeyPath).
emqx_conf_proto_v3:get_config(Node, KeyPath).
%% @doc Returns the specified node's KeyPath, or the default value if not found
-spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
emqx:get_config(KeyPath, Default);
get_by_node(Node, KeyPath, Default) ->
emqx_conf_proto_v2:get_config(Node, KeyPath, Default).
emqx_conf_proto_v3:get_config(Node, KeyPath, Default).
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
-spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
@ -101,7 +101,7 @@ get_node_and_config(KeyPath) ->
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts).
emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts).
%% @doc Update the specified node's key path in local-override.conf.
-spec update(
@ -114,7 +114,7 @@ update(KeyPath, UpdateReq, Opts) ->
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
update(Node, KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts).
%% @doc Mark the specified key path as tombstone
tombstone(KeyPath, Opts) ->
@ -124,7 +124,7 @@ tombstone(KeyPath, Opts) ->
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts) ->
emqx_conf_proto_v2:remove_config(KeyPath, Opts).
emqx_conf_proto_v3:remove_config(KeyPath, Opts).
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@ -132,13 +132,13 @@ remove(KeyPath, Opts) ->
remove(Node, KeyPath, Opts) when Node =:= node() ->
emqx:remove_config(KeyPath, Opts#{override_to => local});
remove(Node, KeyPath, Opts) ->
emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts).
emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts).
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
-spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
emqx_conf_proto_v2:reset(KeyPath, Opts).
emqx_conf_proto_v3:reset(KeyPath, Opts).
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@ -146,7 +146,7 @@ reset(KeyPath, Opts) ->
reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) ->
emqx_conf_proto_v2:reset(Node, KeyPath, Opts).
emqx_conf_proto_v3:reset(Node, KeyPath, Opts).
%% @doc Called from build script.
%% TODO: move to a external escript after all refactoring is done

View File

@ -137,7 +137,7 @@ sync_cluster_conf() ->
%% @private Some core nodes are running, try to sync the cluster config from them.
sync_cluster_conf2(Nodes) ->
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) of
@ -284,7 +284,7 @@ conf_sort({ok, _}, {ok, _}) ->
false.
sync_data_from_node(Node) ->
case emqx_conf_proto_v2:sync_data_from_node(Node) of
case emqx_conf_proto_v3:sync_data_from_node(Node) of
{ok, DataBin} ->
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
{ok, []} ->

View File

@ -0,0 +1,119 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
sync_data_from_node/1,
get_config/2,
get_config/3,
get_all/1,
update/3,
update/4,
remove_config/2,
remove_config/3,
reset/2,
reset/3,
get_override_config_file/1
]).
-export([get_hocon_config/1, get_hocon_config/2]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.1.1".
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
sync_data_from_node(Node) ->
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...].
-spec get_config(node(), emqx_utils_maps:config_key_path()) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_config, [KeyPath]).
-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath, Default) ->
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result().
get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(
node(),
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
reset(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
get_override_config_file(Nodes) ->
rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
-spec get_hocon_config(node()) -> map() | {badrpc, _}.
get_hocon_config(Node) ->
rpc:call(Node, emqx_conf_cli, get_config, []).
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
get_hocon_config(Node, Key) ->
rpc:call(Node, emqx_conf_cli, get_config, [Key]).

View File

@ -179,7 +179,7 @@ get_sys_memory() ->
end.
node_info(Nodes) ->
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)).
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)).
stopped_node_info(Node) ->
{Node, #{node => Node, node_status => 'stopped', role => core}}.
@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) ->
M#{K => iolist_to_binary(V)}.
broker_info(Nodes) ->
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)).
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)).
%%--------------------------------------------------------------------
%% Metrics and Stats
@ -446,7 +446,7 @@ do_call_client(ClientId, Req) ->
%% @private
call_client(Node, ClientId, Req) ->
unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)).
unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)).
%%--------------------------------------------------------------------
%% Subscriptions
@ -459,7 +459,7 @@ do_list_subscriptions() ->
throw(not_implemented).
list_subscriptions(Node) ->
unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)).
list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([
@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) ->
subscribe(emqx:running_nodes(), ClientId, TopicTables).
subscribe([Node | Nodes], ClientId, TopicTables) ->
case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
{subscribe, Res} -> {subscribe, Res, Node}
end;
@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) ->
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, channel_not_found}.
unsubscribe([Node | Nodes], ClientId, Topic) ->
case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of
case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
Re -> Re
end;
@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) ->
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, channel_not_found}.
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of
case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
Re -> Re
end;

View File

@ -350,7 +350,7 @@ configs(put, #{body := Conf, query_string := #{<<"mode">> := Mode}}, _Req) ->
{error, Errors} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Errors)}}
end.
find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Perferences) > 0 ->
find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Preferences) > 0 ->
AcceptVal = maps:get(<<"accept">>, Headers, <<"*/*">>),
%% Multiple types, weighted with the quality value syntax:
%% Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8
@ -363,20 +363,27 @@ find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Per
),
case lists:member(<<"*/*">>, Accepts) of
true ->
{ok, lists:nth(1, Perferences)};
{ok, lists:nth(1, Preferences)};
false ->
Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Perferences),
Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Preferences),
case Found of
[] -> {error, no_suitalbe_accept};
[] -> {error, no_suitable_accept};
_ -> {ok, lists:nth(1, Found)}
end
end.
%% To return a JSON formatted configuration file, which is used to be compatible with the already
%% implemented `GET /configs` in the old versions 5.0 and 5.1.
%%
%% In e5.1.1, we support to return a hocon configuration file by `get_configs_v2/1`. It's more
%% useful for the user to read or reload the configuration file via HTTP API.
%%
%% The `get_configs_v1/1` should be deprecated since 5.2.0.
get_configs_v1(QueryStr) ->
Node = maps:get(<<"node">>, QueryStr, node()),
case
lists:member(Node, emqx:running_nodes()) andalso
emqx_management_proto_v2:get_full_config(Node)
emqx_management_proto_v4:get_full_config(Node)
of
false ->
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
@ -389,10 +396,13 @@ get_configs_v1(QueryStr) ->
end.
get_configs_v2(QueryStr) ->
Node = maps:get(<<"node">>, QueryStr, node()),
Conf =
case maps:find(<<"key">>, QueryStr) of
error -> emqx_conf_cli:get_config();
{ok, Key} -> emqx_conf_cli:get_config(atom_to_binary(Key))
error ->
emqx_conf_proto_v3:get_hocon_config(Node);
{ok, Key} ->
emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key))
end,
{
200,

View File

@ -515,7 +515,7 @@ list_listeners() ->
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
list_listeners(Node) ->
wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).
wrap_rpc(emqx_management_proto_v4:list_listeners(Node)).
listener_status_by_id(NodeL) ->
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),

View File

@ -272,18 +272,22 @@ t_dashboard(_Config) ->
t_configs_node({'init', Config}) ->
Node = node(),
meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
meck:expect(
emqx_management_proto_v2,
get_full_config,
fun
(Node0) when Node0 =:= Node -> <<"\"self\"">>;
(other_node) -> <<"\"other\"">>;
(bad_node) -> {badrpc, bad}
end
),
F = fun
(Node0) when Node0 =:= Node -> <<"\"self\"">>;
(other_node) -> <<"\"other\"">>;
(bad_node) -> {badrpc, bad}
end,
F2 = fun
(Node0, _) when Node0 =:= Node -> <<"log=1">>;
(other_node, _) -> <<"log=2">>;
(bad_node, _) -> {badrpc, bad}
end,
meck:expect(emqx_management_proto_v4, get_full_config, F),
meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
Config;
t_configs_node({'end', _}) ->
meck:unload([emqx, emqx_management_proto_v2]);
meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]);
t_configs_node(_) ->
Node = atom_to_list(node()),
@ -296,7 +300,10 @@ t_configs_node(_) ->
{_, _, Body} = ExpRes,
?assertMatch(#{<<"code">> := <<"NOT_FOUND">>}, emqx_utils_json:decode(Body, [return_maps])),
?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")).
?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")),
?assertEqual({ok, #{<<"log">> => 1}}, get_configs_with_binary("log", Node)),
?assertEqual({ok, #{<<"log">> => 2}}, get_configs_with_binary("log", "other_node")).
%% v2 version binary
t_configs_key(_Config) ->
@ -386,12 +393,16 @@ get_configs_with_json(Node, Opts) ->
end.
get_configs_with_binary(Key) ->
get_configs_with_binary(Key, atom_to_list(node())).
get_configs_with_binary(Key, Node) ->
Path0 = "configs?node=" ++ Node,
Path =
case Key of
undefined -> ["configs"];
_ -> ["configs?key=" ++ Key]
undefined -> Path0;
_ -> Path0 ++ "&key=" ++ Key
end,
URI = emqx_mgmt_api_test_util:api_path(Path),
URI = emqx_mgmt_api_test_util:api_path([Path]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Headers = [{"accept", "text/plain"}, Auth],
case emqx_mgmt_api_test_util:request_api(get, URI, [], Headers, [], #{return_all => true}) of