chore: move emqx_conf_proto_v3 to emqx_conf_proto_v4
This commit is contained in:
parent
3ed4340145
commit
5b105fcdbb
|
@ -16,6 +16,7 @@
|
|||
{emqx_conf,1}.
|
||||
{emqx_conf,2}.
|
||||
{emqx_conf,3}.
|
||||
{emqx_conf,4}.
|
||||
{emqx_connector,1}.
|
||||
{emqx_dashboard,1}.
|
||||
{emqx_delayed,1}.
|
||||
|
|
|
@ -64,7 +64,7 @@ get_raw(KeyPath) ->
|
|||
%% @doc Returns all values in the cluster.
|
||||
-spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
|
||||
get_all(KeyPath) ->
|
||||
{ResL, []} = emqx_conf_proto_v3:get_all(KeyPath),
|
||||
{ResL, []} = emqx_conf_proto_v4:get_all(KeyPath),
|
||||
maps:from_list(ResL).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or exception if not found
|
||||
|
@ -72,14 +72,14 @@ get_all(KeyPath) ->
|
|||
get_by_node(Node, KeyPath) when Node =:= node() ->
|
||||
emqx:get_config(KeyPath);
|
||||
get_by_node(Node, KeyPath) ->
|
||||
emqx_conf_proto_v3:get_config(Node, KeyPath).
|
||||
emqx_conf_proto_v4:get_config(Node, KeyPath).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or the default value if not found
|
||||
-spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
|
||||
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
|
||||
emqx:get_config(KeyPath, Default);
|
||||
get_by_node(Node, KeyPath, Default) ->
|
||||
emqx_conf_proto_v3:get_config(Node, KeyPath, Default).
|
||||
emqx_conf_proto_v4:get_config(Node, KeyPath, Default).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
|
||||
-spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
|
||||
|
@ -94,7 +94,7 @@ get_node_and_config(KeyPath) ->
|
|||
) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update(KeyPath, UpdateReq, Opts) ->
|
||||
emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts).
|
||||
emqx_conf_proto_v4:update(KeyPath, UpdateReq, Opts).
|
||||
|
||||
%% @doc Update the specified node's key path in local-override.conf.
|
||||
-spec update(
|
||||
|
@ -107,7 +107,7 @@ update(KeyPath, UpdateReq, Opts) ->
|
|||
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
|
||||
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
|
||||
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||
emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts).
|
||||
emqx_conf_proto_v4:update(Node, KeyPath, UpdateReq, Opts).
|
||||
|
||||
%% @doc Mark the specified key path as tombstone
|
||||
tombstone(KeyPath, Opts) ->
|
||||
|
@ -117,7 +117,7 @@ tombstone(KeyPath, Opts) ->
|
|||
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
remove(KeyPath, Opts) ->
|
||||
emqx_conf_proto_v3:remove_config(KeyPath, Opts).
|
||||
emqx_conf_proto_v4:remove_config(KeyPath, Opts).
|
||||
|
||||
%% @doc remove the specified node's key path in local-override.conf.
|
||||
-spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||
|
@ -125,13 +125,13 @@ remove(KeyPath, Opts) ->
|
|||
remove(Node, KeyPath, Opts) when Node =:= node() ->
|
||||
emqx:remove_config(KeyPath, Opts#{override_to => local});
|
||||
remove(Node, KeyPath, Opts) ->
|
||||
emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts).
|
||||
emqx_conf_proto_v4:remove_config(Node, KeyPath, Opts).
|
||||
|
||||
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
|
||||
-spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
reset(KeyPath, Opts) ->
|
||||
emqx_conf_proto_v3:reset(KeyPath, Opts).
|
||||
emqx_conf_proto_v4:reset(KeyPath, Opts).
|
||||
|
||||
%% @doc reset the specified node's key path in local-override.conf.
|
||||
-spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||
|
@ -139,7 +139,7 @@ reset(KeyPath, Opts) ->
|
|||
reset(Node, KeyPath, Opts) when Node =:= node() ->
|
||||
emqx:reset_config(KeyPath, Opts#{override_to => local});
|
||||
reset(Node, KeyPath, Opts) ->
|
||||
emqx_conf_proto_v3:reset(Node, KeyPath, Opts).
|
||||
emqx_conf_proto_v4:reset(Node, KeyPath, Opts).
|
||||
|
||||
%% @doc Called from build script.
|
||||
%% TODO: move to a external escript after all refactoring is done
|
||||
|
|
|
@ -139,7 +139,7 @@ sync_cluster_conf() ->
|
|||
|
||||
%% @private Some core nodes are running, try to sync the cluster config from them.
|
||||
sync_cluster_conf2(Nodes) ->
|
||||
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
||||
{Results, Failed} = emqx_conf_proto_v4:get_override_config_file(Nodes),
|
||||
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
LogData = #{peer_nodes => Nodes, self_node => node()},
|
||||
case Failed ++ NotReady of
|
||||
|
@ -300,7 +300,7 @@ conf_sort({ok, _}, {ok, _}) ->
|
|||
false.
|
||||
|
||||
sync_data_from_node(Node) ->
|
||||
case emqx_conf_proto_v3:sync_data_from_node(Node) of
|
||||
case emqx_conf_proto_v4:sync_data_from_node(Node) of
|
||||
{ok, DataBin} ->
|
||||
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
|
||||
{ok, []} ->
|
||||
|
|
|
@ -145,7 +145,7 @@ admins(_) ->
|
|||
emqx_ctl:usage(usage_sync()).
|
||||
|
||||
fix_inconsistent_with_raw(Node, Keys) ->
|
||||
Confs = [#{Key => emqx_conf_proto_v3:get_raw_config(Node, Key)} || Key <- Keys],
|
||||
Confs = [#{Key => emqx_conf_proto_v4:get_raw_config(Node, Key)} || Key <- Keys],
|
||||
ok = emqx_cluster_rpc:reset(),
|
||||
case load_config_from_raw(Confs, #{mode => replace}) of
|
||||
ok -> waiting_for_fix_finish();
|
||||
|
@ -719,7 +719,7 @@ changed(K, V, Conf) ->
|
|||
find_running_confs() ->
|
||||
lists:map(
|
||||
fun(Node) ->
|
||||
Conf = emqx_conf_proto_v3:get_config(Node, []),
|
||||
Conf = emqx_conf_proto_v4:get_config(Node, []),
|
||||
{Node, maps:without(?READONLY_KEYS, Conf)}
|
||||
end,
|
||||
mria:running_nodes()
|
||||
|
@ -788,8 +788,8 @@ print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} ->
|
|||
node := {Node, NodeTnxId}
|
||||
} = Options,
|
||||
emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]),
|
||||
NodeRawConf = emqx_conf_proto_v3:get_raw_config(Node, [Key]),
|
||||
TargetRawConf = emqx_conf_proto_v3:get_raw_config(Target, [Key]),
|
||||
NodeRawConf = emqx_conf_proto_v4:get_raw_config(Node, [Key]),
|
||||
TargetRawConf = emqx_conf_proto_v4:get_raw_config(Target, [Key]),
|
||||
{TargetConf, NodeConf} =
|
||||
maps:fold(
|
||||
fun(SubKey, _, {NewAcc, OldAcc}) ->
|
||||
|
|
|
@ -37,7 +37,6 @@
|
|||
]).
|
||||
|
||||
-export([get_hocon_config/1, get_hocon_config/2]).
|
||||
-export([get_raw_config/2]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
|
@ -115,10 +114,6 @@ get_override_config_file(Nodes) ->
|
|||
get_hocon_config(Node) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, []).
|
||||
|
||||
-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}.
|
||||
get_raw_config(Node, KeyPath) ->
|
||||
rpc:call(Node, emqx, get_raw_config, [KeyPath]).
|
||||
|
||||
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
|
||||
get_hocon_config(Node, Key) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, [Key]).
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_conf_proto_v4).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
sync_data_from_node/1,
|
||||
get_config/2,
|
||||
get_config/3,
|
||||
get_all/1,
|
||||
|
||||
update/3,
|
||||
update/4,
|
||||
remove_config/2,
|
||||
remove_config/3,
|
||||
|
||||
reset/2,
|
||||
reset/3,
|
||||
|
||||
get_override_config_file/1
|
||||
]).
|
||||
|
||||
-export([get_hocon_config/1, get_hocon_config/2]).
|
||||
-export([get_raw_config/2]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.7.1".
|
||||
|
||||
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||
sync_data_from_node(Node) ->
|
||||
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
|
||||
-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...].
|
||||
|
||||
-spec get_config(node(), emqx_utils_maps:config_key_path()) ->
|
||||
term() | emqx_rpc:badrpc().
|
||||
get_config(Node, KeyPath) ->
|
||||
rpc:call(Node, emqx, get_config, [KeyPath]).
|
||||
|
||||
-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) ->
|
||||
term() | emqx_rpc:badrpc().
|
||||
get_config(Node, KeyPath, Default) ->
|
||||
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
|
||||
|
||||
-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result().
|
||||
get_all(KeyPath) ->
|
||||
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
|
||||
|
||||
-spec update(
|
||||
update_config_key_path(),
|
||||
emqx_config:update_request(),
|
||||
emqx_config:update_opts()
|
||||
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update(KeyPath, UpdateReq, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
|
||||
|
||||
-spec update(
|
||||
node(),
|
||||
update_config_key_path(),
|
||||
emqx_config:update_request(),
|
||||
emqx_config:update_opts()
|
||||
) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
|
||||
|
||||
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
remove_config(KeyPath, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
|
||||
|
||||
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
remove_config(Node, KeyPath, Opts) ->
|
||||
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
|
||||
|
||||
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
reset(KeyPath, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
|
||||
|
||||
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
reset(Node, KeyPath, Opts) ->
|
||||
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
|
||||
|
||||
-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
|
||||
get_override_config_file(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
|
||||
|
||||
-spec get_hocon_config(node()) -> map() | {badrpc, _}.
|
||||
get_hocon_config(Node) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, []).
|
||||
|
||||
-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}.
|
||||
get_raw_config(Node, KeyPath) ->
|
||||
rpc:call(Node, emqx, get_raw_config, [KeyPath]).
|
||||
|
||||
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
|
||||
get_hocon_config(Node, Key) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, [Key]).
|
|
@ -144,7 +144,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|||
?assertMatch(
|
||||
{init_failure,
|
||||
{error, #{
|
||||
msg := stale_view_of_cluster_state,
|
||||
msg := stale_view_of_cluster,
|
||||
retry_times := 2,
|
||||
cluster_tnx_id := 2,
|
||||
node_tnx_id := 1,
|
||||
|
|
|
@ -428,9 +428,9 @@ get_configs_v2(QueryStr) ->
|
|||
Conf =
|
||||
case maps:find(<<"key">>, QueryStr) of
|
||||
error ->
|
||||
emqx_conf_proto_v3:get_hocon_config(Node);
|
||||
emqx_conf_proto_v4:get_hocon_config(Node);
|
||||
{ok, Key} ->
|
||||
emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key))
|
||||
emqx_conf_proto_v4:get_hocon_config(Node, atom_to_binary(Key))
|
||||
end,
|
||||
{
|
||||
200,
|
||||
|
|
|
@ -240,11 +240,11 @@ t_configs_node({'init', Config}) ->
|
|||
(bad_node, _) -> {badrpc, bad}
|
||||
end,
|
||||
meck:expect(emqx_management_proto_v5, get_full_config, F),
|
||||
meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
|
||||
meck:expect(emqx_conf_proto_v4, get_hocon_config, F2),
|
||||
meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
|
||||
Config;
|
||||
t_configs_node({'end', _}) ->
|
||||
meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]);
|
||||
meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v4, hocon_pp]);
|
||||
t_configs_node(_) ->
|
||||
Node = atom_to_list(node()),
|
||||
|
||||
|
|
Loading…
Reference in New Issue