Merge pull request #13202 from zhongwencool/cluster-fix-cli

feat: add cluster fix command
This commit is contained in:
zhongwencool 2024-07-08 19:08:34 +08:00 committed by GitHub
commit fd18e5feb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 754 additions and 60 deletions

View File

@ -16,6 +16,7 @@
{emqx_conf,1}. {emqx_conf,1}.
{emqx_conf,2}. {emqx_conf,2}.
{emqx_conf,3}. {emqx_conf,3}.
{emqx_conf,4}.
{emqx_connector,1}. {emqx_connector,1}.
{emqx_dashboard,1}. {emqx_dashboard,1}.
{emqx_delayed,1}. {emqx_delayed,1}.

View File

@ -35,6 +35,17 @@
tnx_id :: pos_integer() | '$1' tnx_id :: pos_integer() | '$1'
}). }).
-define(SUGGESTION(Node),
lists:flatten(
io_lib:format(
"run `./bin/emqx_ctl conf cluster_sync fix`"
" on ~p(config leader) to force sync the configs, "
"if this node has been lagging for more than 3 minutes.",
[Node]
)
)
).
-define(READONLY_KEYS, [cluster, rpc, node]). -define(READONLY_KEYS, [cluster, rpc, node]).
-endif. -endif.

View File

@ -28,12 +28,14 @@
reset/0, reset/0,
status/0, status/0,
is_initiator/1, is_initiator/1,
find_leader/0,
skip_failed_commit/1, skip_failed_commit/1,
fast_forward_to_commit/2, fast_forward_to_commit/2,
on_mria_stop/1, on_mria_stop/1,
force_leave_clean/1, force_leave_clean/1,
wait_for_cluster_rpc/0, wait_for_cluster_rpc/0,
maybe_init_tnx_id/2 maybe_init_tnx_id/2,
update_mfa/3
]). ]).
-export([ -export([
commit/2, commit/2,
@ -41,6 +43,7 @@
get_cluster_tnx_id/0, get_cluster_tnx_id/0,
get_node_tnx_id/1, get_node_tnx_id/1,
init_mfa/2, init_mfa/2,
force_sync_tnx_id/3,
latest_tnx_id/0, latest_tnx_id/0,
make_initiate_call_req/3, make_initiate_call_req/3,
read_next_mfa/1, read_next_mfa/1,
@ -227,6 +230,17 @@ status() ->
is_initiator(Opts) -> is_initiator(Opts) ->
?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE). ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE).
find_leader() ->
{atomic, Status} = status(),
case Status of
[#{node := N} | _] ->
N;
[] ->
%% running nodes already sort.
[N | _] = emqx:running_nodes(),
N
end.
%% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560.
on_leave_clean() -> on_leave_clean() ->
on_leave_clean(node()). on_leave_clean(node()).
@ -500,12 +514,14 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) ->
end. end.
stale_view_of_cluster_msg(Meta, Count) -> stale_view_of_cluster_msg(Meta, Count) ->
Node = find_leader(),
Reason = Meta#{ Reason = Meta#{
msg => stale_view_of_cluster_state, msg => stale_view_of_cluster,
retry_times => Count retry_times => Count,
suggestion => ?SUGGESTION(Node)
}, },
?SLOG(warning, Reason), ?SLOG(warning, Reason),
Reason. {error, Reason}.
%% The entry point of a config change transaction. %% The entry point of a config change transaction.
init_mfa(Node, MFA) -> init_mfa(Node, MFA) ->
@ -532,11 +548,41 @@ init_mfa(Node, MFA) ->
{retry, Meta} {retry, Meta}
end. end.
force_sync_tnx_id(Node, MFA, NodeTnxId) ->
mnesia:write_lock_table(?CLUSTER_MFA),
case get_node_tnx_id(Node) of
NodeTnxId ->
TnxId = NodeTnxId + 1,
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
lists:foreach(
fun(N) ->
ok = emqx_cluster_rpc:commit(N, NodeTnxId)
end,
mria:running_nodes()
);
NewTnxId ->
Fmt = "aborted_force_sync, tnx_id(~w) is not the latest(~w)",
Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]),
mnesia:abort({error, Reason})
end.
update_mfa(Node, MFA, LatestId) ->
case transaction(fun ?MODULE:force_sync_tnx_id/3, [Node, MFA, LatestId]) of
{atomic, ok} -> ok;
{aborted, Error} -> Error
end.
transaction(Func, Args) -> transaction(Func, Args) ->
mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
trans_status() -> trans_status() ->
mnesia:foldl( List = mnesia:foldl(
fun(Rec, Acc) -> fun(Rec, Acc) ->
#cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
case mnesia:read(?CLUSTER_MFA, TnxId) of case mnesia:read(?CLUSTER_MFA, TnxId) of
@ -559,8 +605,22 @@ trans_status() ->
end, end,
[], [],
?CLUSTER_COMMIT ?CLUSTER_COMMIT
),
Nodes = mria:running_nodes(),
IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))),
lists:sort(
fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) ->
{IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)}
end,
List
). ).
index_nodes(Node, IndexNodes) ->
case lists:keyfind(Node, 1, IndexNodes) of
false -> 0;
{_, Index} -> Index
end.
trans_query(TnxId) -> trans_query(TnxId) ->
case mnesia:read(?CLUSTER_MFA, TnxId) of case mnesia:read(?CLUSTER_MFA, TnxId) of
[] -> [] ->

View File

@ -94,6 +94,9 @@ del_stale_mfa(MaxHistory) ->
), ),
delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
%% Do nothing when cluster_rpc_commit is empty.
delete_stale_mfa(_, infinity, _Count) ->
ok;
delete_stale_mfa('$end_of_table', _DoneId, _Count) -> delete_stale_mfa('$end_of_table', _DoneId, _Count) ->
ok; ok;
delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->

View File

@ -64,7 +64,7 @@ get_raw(KeyPath) ->
%% @doc Returns all values in the cluster. %% @doc Returns all values in the cluster.
-spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}. -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
get_all(KeyPath) -> get_all(KeyPath) ->
{ResL, []} = emqx_conf_proto_v3:get_all(KeyPath), {ResL, []} = emqx_conf_proto_v4:get_all(KeyPath),
maps:from_list(ResL). maps:from_list(ResL).
%% @doc Returns the specified node's KeyPath, or exception if not found %% @doc Returns the specified node's KeyPath, or exception if not found
@ -72,14 +72,14 @@ get_all(KeyPath) ->
get_by_node(Node, KeyPath) when Node =:= node() -> get_by_node(Node, KeyPath) when Node =:= node() ->
emqx:get_config(KeyPath); emqx:get_config(KeyPath);
get_by_node(Node, KeyPath) -> get_by_node(Node, KeyPath) ->
emqx_conf_proto_v3:get_config(Node, KeyPath). emqx_conf_proto_v4:get_config(Node, KeyPath).
%% @doc Returns the specified node's KeyPath, or the default value if not found %% @doc Returns the specified node's KeyPath, or the default value if not found
-spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term(). -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
get_by_node(Node, KeyPath, Default) when Node =:= node() -> get_by_node(Node, KeyPath, Default) when Node =:= node() ->
emqx:get_config(KeyPath, Default); emqx:get_config(KeyPath, Default);
get_by_node(Node, KeyPath, Default) -> get_by_node(Node, KeyPath, Default) ->
emqx_conf_proto_v3:get_config(Node, KeyPath, Default). emqx_conf_proto_v4:get_config(Node, KeyPath, Default).
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
-spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term(). -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
@ -94,7 +94,7 @@ get_node_and_config(KeyPath) ->
) -> ) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) -> update(KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts). emqx_conf_proto_v4:update(KeyPath, UpdateReq, Opts).
%% @doc Update the specified node's key path in local-override.conf. %% @doc Update the specified node's key path in local-override.conf.
-spec update( -spec update(
@ -107,7 +107,7 @@ update(KeyPath, UpdateReq, Opts) ->
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
update(Node, KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts). emqx_conf_proto_v4:update(Node, KeyPath, UpdateReq, Opts).
%% @doc Mark the specified key path as tombstone %% @doc Mark the specified key path as tombstone
tombstone(KeyPath, Opts) -> tombstone(KeyPath, Opts) ->
@ -117,7 +117,7 @@ tombstone(KeyPath, Opts) ->
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts) -> remove(KeyPath, Opts) ->
emqx_conf_proto_v3:remove_config(KeyPath, Opts). emqx_conf_proto_v4:remove_config(KeyPath, Opts).
%% @doc remove the specified node's key path in local-override.conf. %% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@ -125,13 +125,13 @@ remove(KeyPath, Opts) ->
remove(Node, KeyPath, Opts) when Node =:= node() -> remove(Node, KeyPath, Opts) when Node =:= node() ->
emqx:remove_config(KeyPath, Opts#{override_to => local}); emqx:remove_config(KeyPath, Opts#{override_to => local});
remove(Node, KeyPath, Opts) -> remove(Node, KeyPath, Opts) ->
emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts). emqx_conf_proto_v4:remove_config(Node, KeyPath, Opts).
%% @doc reset all value of key path in cluster-override.conf or local-override.conf. %% @doc reset all value of key path in cluster-override.conf or local-override.conf.
-spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) -> reset(KeyPath, Opts) ->
emqx_conf_proto_v3:reset(KeyPath, Opts). emqx_conf_proto_v4:reset(KeyPath, Opts).
%% @doc reset the specified node's key path in local-override.conf. %% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@ -139,7 +139,7 @@ reset(KeyPath, Opts) ->
reset(Node, KeyPath, Opts) when Node =:= node() -> reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local}); emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) -> reset(Node, KeyPath, Opts) ->
emqx_conf_proto_v3:reset(Node, KeyPath, Opts). emqx_conf_proto_v4:reset(Node, KeyPath, Opts).
%% @doc Called from build script. %% @doc Called from build script.
%% TODO: move to a external escript after all refactoring is done %% TODO: move to a external escript after all refactoring is done

View File

@ -139,7 +139,7 @@ sync_cluster_conf() ->
%% @private Some core nodes are running, try to sync the cluster config from them. %% @private Some core nodes are running, try to sync the cluster config from them.
sync_cluster_conf2(Nodes) -> sync_cluster_conf2(Nodes) ->
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), {Results, Failed} = emqx_conf_proto_v4:get_override_config_file(Nodes),
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
LogData = #{peer_nodes => Nodes, self_node => node()}, LogData = #{peer_nodes => Nodes, self_node => node()},
case Failed ++ NotReady of case Failed ++ NotReady of
@ -300,7 +300,7 @@ conf_sort({ok, _}, {ok, _}) ->
false. false.
sync_data_from_node(Node) -> sync_data_from_node(Node) ->
case emqx_conf_proto_v3:sync_data_from_node(Node) of case emqx_conf_proto_v4:sync_data_from_node(Node) of
{ok, DataBin} -> {ok, DataBin} ->
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
{ok, []} -> {ok, []} ->

View File

@ -18,13 +18,15 @@
-include("emqx_conf.hrl"). -include("emqx_conf.hrl").
-include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-export([ -export([
load/0, load/0,
admins/1, admins/1,
conf/1, conf/1,
audit/3, audit/3,
unload/0 unload/0,
mark_fix_log/2
]). ]).
-export([keys/0, get_config/0, get_config/1, load_config/2]). -export([keys/0, get_config/0, get_config/1, load_config/2]).
@ -38,6 +40,7 @@
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
-define(TIMEOUT, 30000).
-dialyzer({no_match, [load/0]}). -dialyzer({no_match, [load/0]}).
@ -90,13 +93,22 @@ admins(["skip", Node0]) ->
Node = list_to_existing_atom(Node0), Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:skip_failed_commit(Node), emqx_cluster_rpc:skip_failed_commit(Node),
status(); status();
admins(["tnxid", TnxId0]) ->
%% changed to 'inspect' in 5.6
%% TODO: delete this clause in 5.7
admins(["inspect", TnxId0]);
admins(["inspect", TnxId0]) -> admins(["inspect", TnxId0]) ->
TnxId = list_to_integer(TnxId0), TnxId = list_to_integer(TnxId0),
print(emqx_cluster_rpc:query(TnxId)); print(emqx_cluster_rpc:query(TnxId));
admins(["fix"]) ->
{atomic, Status} = emqx_cluster_rpc:status(),
case mria_rlog:role() of
core ->
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
maybe_fix_lagging(Status, #{fix => true}),
StoppedNodes =/= [] andalso
emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]),
ok;
Role ->
Leader = emqx_cluster_rpc:find_leader(),
emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role])
end;
admins(["fast_forward"]) -> admins(["fast_forward"]) ->
status(), status(),
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
@ -118,6 +130,38 @@ admins(["fast_forward", Node0, ToTnxId]) ->
admins(_) -> admins(_) ->
emqx_ctl:usage(usage_sync()). emqx_ctl:usage(usage_sync()).
fix_lagging_with_raw(ToTnxId, Node, Keys) ->
Confs = lists:foldl(
fun(Key, Acc) ->
KeyRaw = atom_to_binary(Key),
Acc#{KeyRaw => emqx_conf_proto_v4:get_raw_config(Node, [Key])}
end,
#{},
Keys
),
case mark_fix_begin(Node, ToTnxId) of
ok ->
case load_config_from_raw(Confs, #{mode => replace}) of
ok -> waiting_for_fix_finish();
Error0 -> Error0
end;
{error, Reason} ->
emqx_ctl:warning("mark fix begin failed: ~s~n", [Reason])
end.
mark_fix_begin(Node, TnxId) ->
{atomic, Status} = emqx_cluster_rpc:status(),
MFA = {?MODULE, mark_fix_log, [Status]},
emqx_cluster_rpc:update_mfa(Node, MFA, TnxId).
mark_fix_log(Status, Opts) ->
?SLOG(warning, #{
msg => cluster_config_sync_triggered,
status => emqx_utils:redact(Status),
opts => Opts
}),
ok.
audit(Level, From, Log) -> audit(Level, From, Log) ->
?AUDIT(Level, redact(Log#{from => From})). ?AUDIT(Level, redact(Log#{from => From})).
@ -159,29 +203,72 @@ usage_sync() ->
"WARNING: This results in inconsistent configs among the clustered nodes."}, "WARNING: This results in inconsistent configs among the clustered nodes."},
{"conf cluster_sync fast_forward [node] <ID>", {"conf cluster_sync fast_forward [node] <ID>",
"Fast-forward config change to the given commit ID on the given node.\n" "Fast-forward config change to the given commit ID on the given node.\n"
"WARNING: This results in inconsistent configs among the clustered nodes."} "WARNING: This results in inconsistent configs among the clustered nodes."},
{"conf cluster_sync fix",
"Sync the node with the most comprehensive configuration to other node.\n"
"WARNING: typically the config leader(with the highest tnxid)."}
]. ].
status() -> status() ->
emqx_ctl:print("-----------------------------------------------\n"),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
lists:foreach( status(Status).
fun(S) ->
#{ status(Status) ->
node := Node, emqx_ctl:print("-----------------------------------------------\n"),
tnx_id := TnxId, #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
mfa := {M, F, A}, maybe_fix_lagging(Status, #{fix => false}),
created_at := CreatedAt StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]),
} = S,
emqx_ctl:print(
"~p:[~w] CreatedAt:~p ~p:~p/~w\n",
[Node, TnxId, CreatedAt, M, F, length(A)]
)
end,
Status
),
emqx_ctl:print("-----------------------------------------------\n"). emqx_ctl:print("-----------------------------------------------\n").
maybe_fix_lagging(Status, #{fix := Fix}) ->
%% find inconsistent in conf, but fix in raw way.
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
AllConfs = find_running_confs(),
case find_lagging(Status, AllConfs) of
{inconsistent_tnx_id_key, ToTnxId, Target, InconsistentKeys} when Fix ->
_ = fix_lagging_with_raw(ToTnxId, Target, InconsistentKeys),
ok;
{inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} ->
emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs);
{inconsistent_tnx_id, ToTnxId, Target} when Fix ->
print_tnx_id_status(Status),
case mark_fix_begin(Target, ToTnxId) of
ok ->
waiting_for_fix_finish(),
emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]);
Error ->
Error
end;
{inconsistent_tnx_id, _ToTnxId, _Target} ->
print_tnx_id_status(Status),
Leader = emqx_cluster_rpc:find_leader(),
emqx_ctl:print(?SUGGESTION(Leader));
{inconsistent_key, ToTnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [
ToTnxId
]),
emqx_ctl:warning(
"but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n",
[InconsistentKeys]
),
emqx_ctl:warning(
"Configuring different values (excluding node.name) through environment variables and etc/emqx.conf"
" is allowed but not recommended.~n"
),
Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"),
ok;
{consistent, Msg} ->
emqx_ctl:print(Msg)
end.
print_tnx_id_status(List0) ->
emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"),
List1 = lists:map(fun(#{node := Node, tnx_id := TnxId}) -> {Node, TnxId} end, List0),
emqx_ctl:print("~p~n", [List1]).
print_keys(Keys) -> print_keys(Keys) ->
SortKeys = lists:sort(Keys), SortKeys = lists:sort(Keys),
emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]).
@ -500,8 +587,7 @@ filter_readonly_config(Raw) ->
try try
RawDefault = fill_defaults(Raw), RawDefault = fill_defaults(Raw),
_ = emqx_config:check_config(SchemaMod, RawDefault), _ = emqx_config:check_config(SchemaMod, RawDefault),
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)}
{ok, maps:without(ReadOnlyKeys, Raw)}
catch catch
throw:Error -> throw:Error ->
?SLOG(error, #{ ?SLOG(error, #{
@ -588,3 +674,246 @@ warning(_, Format, Args) -> emqx_ctl:warning(Format, Args).
print(#{log := none}, _, _) -> ok; print(#{log := none}, _, _) -> ok;
print(_, Format, Args) -> emqx_ctl:print(Format, Args). print(_, Format, Args) -> emqx_ctl:print(Format, Args).
waiting_for_fix_finish() ->
timer:sleep(1000),
waiting_for_sync_finish(1).
waiting_for_sync_finish(10) ->
emqx_ctl:warning(
"Config is still not in sync after 10s.~n"
"It may need more time, and check the logs in the lagging nodes.~n"
);
waiting_for_sync_finish(Sec) ->
{atomic, Status} = emqx_cluster_rpc:status(),
case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of
[_] ->
emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]);
_ ->
Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]),
emqx_ctl:warning("sync status: ~p~n", [Res]),
timer:sleep(1000),
waiting_for_sync_finish(Sec + 1)
end.
find_lagging(Status, AllConfs) ->
case find_highest_node(Status) of
{same_tnx_id, TnxId} ->
%% check the conf is the same or not
[{_, TargetConf} | OtherConfs] = AllConfs,
case find_inconsistent_key(TargetConf, OtherConfs) of
[] ->
Msg =
<<"All configuration synchronized(tnx_id=",
(integer_to_binary(TnxId))/binary, ") successfully\n">>,
{consistent, Msg};
InconsistentKeys ->
{inconsistent_key, TnxId, InconsistentKeys}
end;
{ok, TargetId, Target} ->
{value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs),
case find_inconsistent_key(TargetConf, OtherConfs) of
[] -> {inconsistent_tnx_id, TargetId, Target};
ChangedKeys -> {inconsistent_tnx_id_key, TargetId, Target, ChangedKeys}
end
end.
find_inconsistent_key(TargetConf, OtherConfs) ->
lists:usort(
lists:foldl(
fun({_Node, OtherConf}, Changed) ->
lists:filtermap(
fun({K, V}) -> changed(K, V, TargetConf) end,
maps:to_list(OtherConf)
) ++ Changed
end,
[],
OtherConfs
)
).
find_highest_node([]) ->
{same_tnx_id, 0};
find_highest_node(Status) ->
Ids = [{Id, Node} || #{tnx_id := Id, node := Node} <- Status],
case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of
[{TnxId, _}] ->
{same_tnx_id, TnxId};
[{TnxId, Target} | _] ->
{ok, TnxId, Target}
end.
changed(K, V, Conf) ->
case maps:find(K, Conf) of
{ok, V1} when V =:= V1 -> false;
_ -> {true, K}
end.
find_running_confs() ->
lists:map(
fun(Node) ->
Conf = emqx_conf_proto_v4:get_config(Node, []),
{Node, maps:without(?READONLY_KEYS, Conf)}
end,
mria:running_nodes()
).
print_inconsistent_conf(Keys, Target, Status, AllConfs) ->
{value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs),
TargetTnxId = get_tnx_id(Target, Status),
lists:foreach(
fun(Key) ->
lists:foreach(
fun({Node, OtherConf}) ->
TargetV = maps:get(Key, TargetConf),
PrevV = maps:get(Key, OtherConf),
NodeTnxId = get_tnx_id(Node, Status),
Options = #{
key => Key,
node => {Node, NodeTnxId},
target => {Target, TargetTnxId}
},
print_inconsistent_conf(TargetV, PrevV, Options)
end,
OtherConfs
)
end,
Keys
).
get_tnx_id(Node, Status) ->
case lists:filter(fun(#{node := Node0}) -> Node0 =:= Node end, Status) of
[] -> 0;
[#{tnx_id := TnxId}] -> TnxId
end.
print_inconsistent_conf(SameConf, SameConf, _Options) ->
ok;
print_inconsistent_conf(New = #{}, Old = #{}, Options) ->
#{
added := Added,
removed := Removed,
changed := Changed
} = emqx_utils_maps:diff_maps(New, Old),
RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they are still present on ~ts(~w).~n",
print_inconsistent(Removed, RemovedFmt, Options),
AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n",
print_inconsistent(Added, AddedFmt, Options),
ChangedFmt =
"~ts(~w)'s ~s has been updated, but the changes have not been applied to ~ts(~w).~n",
print_inconsistent(Changed, ChangedFmt, Options);
%% authentication rewrite topic_metrics is list(not map).
print_inconsistent_conf(New, Old, Options) ->
#{
key := Key,
target := {Target, TargetTnxId},
node := {Node, NodeTnxId}
} = Options,
emqx_ctl:print("~ts(tnx_id=~w)'s ~s is diff from ~ts(tnx_id=~w).~n", [
Node, NodeTnxId, Key, Target, TargetTnxId
]),
emqx_ctl:print("~ts:~n", [Node]),
print_hocon(Old),
emqx_ctl:print("~ts:~n", [Target]),
print_hocon(New).
print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} ->
#{
key := Key,
target := {Target, TargetTnxId},
node := {Node, NodeTnxId}
} = Options,
emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]),
NodeRawConf = emqx_conf_proto_v4:get_raw_config(Node, [Key]),
TargetRawConf = emqx_conf_proto_v4:get_raw_config(Target, [Key]),
{TargetConf, NodeConf} =
maps:fold(
fun(SubKey, _, {NewAcc, OldAcc}) ->
SubNew0 = maps:get(atom_to_binary(SubKey), NodeRawConf, undefined),
SubOld0 = maps:get(atom_to_binary(SubKey), TargetRawConf, undefined),
{SubNew1, SubOld1} = remove_identical_value(SubNew0, SubOld0),
{NewAcc#{SubKey => SubNew1}, OldAcc#{SubKey => SubOld1}}
end,
{#{}, #{}},
Conf
),
%% zones.default is a virtual zone. It will be changed when mqtt changes,
%% so we can't retrieve the raw data for zones.default(always undefined).
case TargetConf =:= NodeConf of
true -> ok;
false -> print_hocon(#{Target => #{Key => TargetConf}, Node => #{Key => NodeConf}})
end;
print_inconsistent(_Conf, _Format, _Options) ->
ok.
remove_identical_value(New = #{}, Old = #{}) ->
maps:fold(
fun(K, NewV, {Acc1, Acc2}) ->
case maps:find(K, Old) of
{ok, NewV} ->
{maps:remove(K, Acc1), maps:remove(K, Acc2)};
{ok, OldV} ->
{NewV1, OldV1} = remove_identical_value(NewV, OldV),
{maps:put(K, NewV1, Acc1), maps:put(K, OldV1, Acc2)}
end
end,
{New, Old},
New
);
remove_identical_value(New, Old) ->
{New, Old}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
find_inconsistent_test() ->
Status = [
#{node => 'node1', tnx_id => 1},
#{node => 'node2', tnx_id => 3},
#{node => 'node3', tnx_id => 2}
],
Stats = #{<<"enable">> => false},
%% chose the highest tnx_id node
Mqtt = #{
<<"await_rel_timeout">> => <<"300s">>,
<<"exclusive_subscription">> => false,
<<"idle_timeout">> => <<"15s">>
},
TargetMqtt1 = Mqtt#{<<"idle_timeout">> => <<"16s">>},
Confs0 = [
{node1, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"11s">>}, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"17s">>}, <<"stats">> => Stats}},
{node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}}
],
?assertEqual(
{inconsistent_tnx_id_key, 3, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0)
),
%% conf is the same, no changed
NoDiffConfs = [
{node1, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}
],
?assertEqual({inconsistent_tnx_id, 3, node2}, find_lagging(Status, NoDiffConfs)),
%% same tnx_id
SameStatus = [
#{node => 'node1', tnx_id => 3},
#{node => 'node2', tnx_id => 3},
#{node => 'node3', tnx_id => 3}
],
%% same conf
?assertEqual(
{consistent, <<"All configuration synchronized(tnx_id=3) successfully\n">>},
find_lagging(SameStatus, NoDiffConfs)
),
%% diff conf same tnx_id use the first one
?assertEqual(
{inconsistent_key, 3, [<<"mqtt">>]},
find_lagging(SameStatus, Confs0)
),
ok.
-endif.

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
sync_data_from_node/1, sync_data_from_node/1,
get_config/2, get_config/2,
get_config/3, get_config/3,
@ -43,6 +44,9 @@
introduced_in() -> introduced_in() ->
"5.1.1". "5.1.1".
deprecated_since() ->
"5.7.1".
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
sync_data_from_node(Node) -> sync_data_from_node(Node) ->
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).

View File

@ -0,0 +1,124 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_proto_v4).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
sync_data_from_node/1,
get_config/2,
get_config/3,
get_all/1,
update/3,
update/4,
remove_config/2,
remove_config/3,
reset/2,
reset/3,
get_override_config_file/1
]).
-export([get_hocon_config/1, get_hocon_config/2]).
-export([get_raw_config/2]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.7.1".
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
sync_data_from_node(Node) ->
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...].
-spec get_config(node(), emqx_utils_maps:config_key_path()) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_config, [KeyPath]).
-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath, Default) ->
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result().
get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(
node(),
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
reset(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
get_override_config_file(Nodes) ->
rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
-spec get_hocon_config(node()) -> map() | {badrpc, _}.
get_hocon_config(Node) ->
rpc:call(Node, emqx_conf_cli, get_config, []).
-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}.
get_raw_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_raw_config, [KeyPath]).
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
get_hocon_config(Node, Key) ->
rpc:call(Node, emqx_conf_cli, get_config, [Key]).

View File

@ -141,13 +141,15 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
Res1 = gen_server:call(?NODE2, Call), Res1 = gen_server:call(?NODE2, Call),
Res2 = gen_server:call(?NODE3, Call), Res2 = gen_server:call(?NODE3, Call),
%% Node2 is retry on tnx_id 1, and should not run Next MFA. %% Node2 is retry on tnx_id 1, and should not run Next MFA.
?assertEqual( ?assertMatch(
{init_failure, #{ {init_failure,
msg => stale_view_of_cluster_state, {error, #{
retry_times => 2, msg := stale_view_of_cluster,
cluster_tnx_id => 2, retry_times := 2,
node_tnx_id => 1 cluster_tnx_id := 2,
}}, node_tnx_id := 1,
suggestion := _
}}},
Res1 Res1
), ),
?assertEqual(Res1, Res2), ?assertEqual(Res1, Res2),

View File

@ -0,0 +1,158 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_cluster_sync_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_conf.hrl").
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
WorkDir = ?config(priv_dir, Config),
Cluster = mk_cluster_spec(#{}),
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
[{cluster_nodes, Nodes} | Config].
end_per_suite(Config) ->
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
t_fix(Config) ->
[Node1, Node2] = ?config(cluster_nodes, Config),
?ON(Node1, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())),
?ON(Node2, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())),
?ON(Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 100}, #{})),
?assertEqual(100, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
?assertEqual(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
?ON(
Node1,
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 1},
#{node := Node1, tnx_id := 1}
]},
emqx_cluster_rpc:status()
)
),
%% fix normal, nothing changed
?ON(Node1, begin
ok = emqx_conf_cli:admins(["fix"]),
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 1},
#{node := Node1, tnx_id := 1}
]},
emqx_cluster_rpc:status()
)
end),
%% fix inconsistent_key. tnx_id is the same, so nothing changed.
emqx_conf_proto_v4:update(Node1, [<<"mqtt">>], #{<<"max_topic_levels">> => 99}, #{}),
?ON(Node1, begin
ok = emqx_conf_cli:admins(["fix"]),
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 1},
#{node := Node1, tnx_id := 1}
]},
emqx_cluster_rpc:status()
)
end),
?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
?assertMatch(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
%% fix inconsistent_tnx_id_key. tnx_id and key are updated.
?ON(Node1, fake_mfa(2, Node1, {?MODULE, undef, []})),
%% 2 -> fake_mfa, 3-> mark_begin_log, 4-> mqtt 5 -> zones
?ON(Node2, begin
ok = emqx_conf_cli:admins(["fix"]),
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 5},
#{node := Node1, tnx_id := 5}
]},
emqx_cluster_rpc:status()
)
end),
?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
?assertMatch(99, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
%% fix inconsistent_tnx_id. tnx_id is updated.
{ok, _} = ?ON(
Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 98}, #{})
),
?ON(Node2, fake_mfa(7, Node2, {?MODULE, undef1, []})),
?ON(Node1, begin
ok = emqx_conf_cli:admins(["fix"]),
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 8},
#{node := Node1, tnx_id := 8}
]},
emqx_cluster_rpc:status()
)
end),
?assertMatch(98, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
?assertMatch(98, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
%% unchanged
?ON(Node1, begin
ok = emqx_conf_cli:admins(["fix"]),
?assertMatch(
{atomic, [
#{node := Node2, tnx_id := 8},
#{node := Node1, tnx_id := 8}
]},
emqx_cluster_rpc:status()
)
end),
ok.
fake_mfa(TnxId, Node, MFA) ->
Func = fun() ->
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = emqx_cluster_rpc:commit(Node, TnxId)
end,
{atomic, ok} = mria:transaction(?CLUSTER_RPC_SHARD, Func, []),
ok.
mk_cluster_spec(Opts) ->
Conf = #{
listeners => #{
tcp => #{default => <<"marked_for_deletion">>},
ssl => #{default => <<"marked_for_deletion">>},
ws => #{default => <<"marked_for_deletion">>},
wss => #{default => <<"marked_for_deletion">>}
}
},
Apps = [
{emqx, #{config => Conf}},
{emqx_conf, #{config => Conf}}
],
[
{emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Apps}},
{emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Apps}}
].

View File

@ -314,15 +314,15 @@ global_zone_configs(get, _Params, _Req) ->
{200, get_zones()}; {200, get_zones()};
global_zone_configs(put, #{body := Body}, _Req) -> global_zone_configs(put, #{body := Body}, _Req) ->
PrevZones = get_zones(), PrevZones = get_zones(),
Res = {Res, Error} =
maps:fold( maps:fold(
fun(Path, Value, Acc) -> fun(Path, Value, {Acc, Error}) ->
PrevValue = maps:get(Path, PrevZones), PrevValue = maps:get(Path, PrevZones),
case Value =/= PrevValue of case Value =/= PrevValue of
true -> true ->
case emqx_conf:update([Path], Value, ?OPTS) of case emqx_conf:update([Path], Value, ?OPTS) of
{ok, #{raw_config := RawConf}} -> {ok, #{raw_config := RawConf}} ->
Acc#{Path => RawConf}; {Acc#{Path => RawConf}, Error};
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "update_global_zone_failed", msg => "update_global_zone_failed",
@ -330,18 +330,18 @@ global_zone_configs(put, #{body := Body}, _Req) ->
path => Path, path => Path,
value => Value value => Value
}), }),
Acc {Acc, Error#{Path => Reason}}
end; end;
false -> false ->
Acc#{Path => Value} {Acc#{Path => Value}, Error}
end end
end, end,
#{}, {#{}, #{}},
Body Body
), ),
case maps:size(Res) =:= maps:size(Body) of case maps:size(Res) =:= maps:size(Body) of
true -> {200, Res}; true -> {200, Res};
false -> {400, #{code => 'UPDATE_FAILED'}} false -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Error)}}
end. end.
config_reset(post, _Params, Req) -> config_reset(post, _Params, Req) ->
@ -428,9 +428,9 @@ get_configs_v2(QueryStr) ->
Conf = Conf =
case maps:find(<<"key">>, QueryStr) of case maps:find(<<"key">>, QueryStr) of
error -> error ->
emqx_conf_proto_v3:get_hocon_config(Node); emqx_conf_proto_v4:get_hocon_config(Node);
{ok, Key} -> {ok, Key} ->
emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key)) emqx_conf_proto_v4:get_hocon_config(Node, atom_to_binary(Key))
end, end,
{ {
200, 200,

View File

@ -44,7 +44,8 @@
pem_cache/1, pem_cache/1,
olp/1, olp/1,
data/1, data/1,
ds/1 ds/1,
cluster_info/0
]). ]).
-spec load() -> ok. -spec load() -> ok.
@ -53,7 +54,7 @@ load() ->
lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
is_cmd(Fun) -> is_cmd(Fun) ->
not lists:member(Fun, [init, load, module_info]). not lists:member(Fun, [init, load, module_info, cluster_info]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Node status %% @doc Node status

View File

@ -240,11 +240,11 @@ t_configs_node({'init', Config}) ->
(bad_node, _) -> {badrpc, bad} (bad_node, _) -> {badrpc, bad}
end, end,
meck:expect(emqx_management_proto_v5, get_full_config, F), meck:expect(emqx_management_proto_v5, get_full_config, F),
meck:expect(emqx_conf_proto_v3, get_hocon_config, F2), meck:expect(emqx_conf_proto_v4, get_hocon_config, F2),
meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end), meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
Config; Config;
t_configs_node({'end', _}) -> t_configs_node({'end', _}) ->
meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]); meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v4, hocon_pp]);
t_configs_node(_) -> t_configs_node(_) ->
Node = atom_to_list(node()), Node = atom_to_list(node()),

View File

@ -0,0 +1 @@
Introduce `emqx_cli conf cluster_sync fix` to address cluster inconsistencies. It synchronizes the configuration of the node with the largest `tnx_id` to all nodes.