feat: add cluster fix command

This commit is contained in:
zhongwencool 2024-06-07 09:19:41 +08:00
parent c8258cebe8
commit 2069910ad1
6 changed files with 335 additions and 32 deletions

View File

@ -501,11 +501,12 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) ->
stale_view_of_cluster_msg(Meta, Count) -> stale_view_of_cluster_msg(Meta, Count) ->
Reason = Meta#{ Reason = Meta#{
msg => stale_view_of_cluster_state, msg => stale_view_of_cluster,
retry_times => Count retry_times => Count,
suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time"
}, },
?SLOG(warning, Reason), ?SLOG(warning, Reason),
Reason. {error, Reason}.
%% The entry point of a config change transaction. %% The entry point of a config change transaction.
init_mfa(Node, MFA) -> init_mfa(Node, MFA) ->

View File

@ -94,6 +94,9 @@ del_stale_mfa(MaxHistory) ->
), ),
delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
%% Do nothing when cluster_rpc_commit is empty.
delete_stale_mfa(_, infinity, _Count) ->
ok;
delete_stale_mfa('$end_of_table', _DoneId, _Count) -> delete_stale_mfa('$end_of_table', _DoneId, _Count) ->
ok; ok;
delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->

View File

@ -18,6 +18,7 @@
-include("emqx_conf.hrl"). -include("emqx_conf.hrl").
-include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-export([ -export([
load/0, load/0,
@ -38,6 +39,8 @@
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
-define(TIMEOUT, 30000).
-dialyzer({no_match, [load/0]}). -dialyzer({no_match, [load/0]}).
@ -90,13 +93,35 @@ admins(["skip", Node0]) ->
Node = list_to_existing_atom(Node0), Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:skip_failed_commit(Node), emqx_cluster_rpc:skip_failed_commit(Node),
status(); status();
admins(["tnxid", TnxId0]) ->
%% changed to 'inspect' in 5.6
%% TODO: delete this clause in 5.7
admins(["inspect", TnxId0]);
admins(["inspect", TnxId0]) -> admins(["inspect", TnxId0]) ->
TnxId = list_to_integer(TnxId0), TnxId = list_to_integer(TnxId0),
print(emqx_cluster_rpc:query(TnxId)); print(emqx_cluster_rpc:query(TnxId));
admins(["fix"]) ->
{atomic, Status} = emqx_cluster_rpc:status(),
%% find inconsistent in conf, but fix in raw way.
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
AllConfs = find_running_confs(),
case find_inconsistent(Status, AllConfs) of
{inconsistent_tnx_id_key, Target, InconsistentKeys} ->
fix_inconsistent_with_raw(Target, InconsistentKeys);
inconsistent_tnx_id ->
print_tnx_id_status(Status),
ok = emqx_cluster_rpc:reset(),
emqx_ctl:print("Reset tnxid to 0 successfully~n");
{inconsistent_key, TnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]),
emqx_ctl:warning(
"but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n",
[InconsistentKeys]
),
emqx_ctl:warning("This is normal. This fix will not make any changes.~n");
{error, Reason} ->
emqx_ctl:print(Reason)
end,
StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]);
admins(["fast_forward"]) -> admins(["fast_forward"]) ->
status(), status(),
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
@ -118,6 +143,14 @@ admins(["fast_forward", Node0, ToTnxId]) ->
admins(_) -> admins(_) ->
emqx_ctl:usage(usage_sync()). emqx_ctl:usage(usage_sync()).
fix_inconsistent_with_raw(Node, Keys) ->
Confs = [#{Key => emqx_conf_proto_v3:get_raw_config(Node, Key)} || Key <- Keys],
ok = emqx_cluster_rpc:reset(),
case load_config_from_raw(Confs, #{mode => replace}) of
ok -> waiting_for_fix_finish();
Error -> Error
end.
audit(Level, From, Log) -> audit(Level, From, Log) ->
?AUDIT(Level, redact(Log#{from => From})). ?AUDIT(Level, redact(Log#{from => From})).
@ -159,29 +192,52 @@ usage_sync() ->
"WARNING: This results in inconsistent configs among the clustered nodes."}, "WARNING: This results in inconsistent configs among the clustered nodes."},
{"conf cluster_sync fast_forward [node] <ID>", {"conf cluster_sync fast_forward [node] <ID>",
"Fast-forward config change to the given commit ID on the given node.\n" "Fast-forward config change to the given commit ID on the given node.\n"
"WARNING: This results in inconsistent configs among the clustered nodes."} "WARNING: This results in inconsistent configs among the clustered nodes."},
{"conf cluster_sync fix",
"Sync the node with the most comprehensive configuration to other node.\n"
"WARNING: typically the one with the highest tnxid."}
]. ].
status() -> status() ->
emqx_ctl:print("-----------------------------------------------\n"),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
lists:foreach( status(Status).
fun(S) ->
#{ status(Status) ->
node := Node, emqx_ctl:print("-----------------------------------------------\n"),
tnx_id := TnxId, #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
mfa := {M, F, A}, AllConfs = find_running_confs(),
created_at := CreatedAt case find_inconsistent(Status, AllConfs) of
} = S, {inconsistent_tnx_id_key, TargetNode, InconsistentKeys} ->
emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs);
inconsistent_tnx_id ->
print_tnx_id_status(Status),
emqx_ctl:print( emqx_ctl:print(
"~p:[~w] CreatedAt:~p ~p:~p/~w\n", "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time"
[Node, TnxId, CreatedAt, M, F, length(A)] );
) {inconsistent_key, TnxId, InconsistentKeys} ->
end, [{Target, _} | _] = AllConfs,
Status print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
), emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]),
emqx_ctl:warning(
"but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n",
[InconsistentKeys]
),
emqx_ctl:warning(
"Configuring different values (excluding node.name) through environment variables and etc/emqx.conf"
" is allowed but not recommended.~n"
);
{error, Reason} ->
emqx_ctl:print(Reason)
end,
StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]),
emqx_ctl:print("-----------------------------------------------\n"). emqx_ctl:print("-----------------------------------------------\n").
print_tnx_id_status(List0) ->
emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"),
List1 = lists:map(fun(#{node := Node, tnx_id := TnxId}) -> {Node, TnxId} end, List0),
emqx_ctl:print("~p~n", [List1]).
print_keys(Keys) -> print_keys(Keys) ->
SortKeys = lists:sort(Keys), SortKeys = lists:sort(Keys),
emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]).
@ -500,8 +556,7 @@ filter_readonly_config(Raw) ->
try try
RawDefault = fill_defaults(Raw), RawDefault = fill_defaults(Raw),
_ = emqx_config:check_config(SchemaMod, RawDefault), _ = emqx_config:check_config(SchemaMod, RawDefault),
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)}
{ok, maps:without(ReadOnlyKeys, Raw)}
catch catch
throw:Error -> throw:Error ->
?SLOG(error, #{ ?SLOG(error, #{
@ -588,3 +643,240 @@ warning(_, Format, Args) -> emqx_ctl:warning(Format, Args).
print(#{log := none}, _, _) -> ok; print(#{log := none}, _, _) -> ok;
print(_, Format, Args) -> emqx_ctl:print(Format, Args). print(_, Format, Args) -> emqx_ctl:print(Format, Args).
waiting_for_fix_finish() ->
timer:sleep(1000),
waiting_for_sync_finish(1).
waiting_for_sync_finish(10) ->
emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n");
waiting_for_sync_finish(Sec) ->
{atomic, Status} = emqx_cluster_rpc:status(),
case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of
[_] ->
emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]);
_ ->
Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]),
emqx_ctl:warning("sync: ~n", [Res]),
timer:sleep(1000),
waiting_for_sync_finish(Sec + 1)
end.
find_inconsistent(Status, AllConfs) ->
case find_highest_node(Status) of
{same_tnx_id, TnxId} ->
%% check the conf is the same or not
[{_, TargetConf} | OtherConfs] = AllConfs,
case find_inconsistent_key(TargetConf, OtherConfs) of
[] ->
Msg =
<<"All configuration has already been synchronized(",
(integer_to_binary(TnxId))/binary, ") successfully\n">>,
{error, Msg};
InconsistentKeys ->
{inconsistent_key, TnxId, InconsistentKeys}
end;
{ok, Target} ->
{value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs),
case find_inconsistent_key(TargetConf, OtherConfs) of
[] -> inconsistent_tnx_id;
ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys}
end
end.
find_inconsistent_key(TargetConf, OtherConfs) ->
lists:usort(
lists:foldl(
fun({_Node, OtherConf}, Changed) ->
lists:filtermap(
fun({K, V}) -> changed(K, V, TargetConf) end,
maps:to_list(OtherConf)
) ++ Changed
end,
[],
OtherConfs
)
).
find_highest_node([]) ->
{same_tnx_id, 0};
find_highest_node(Status) ->
Ids = [{Id, Node} || #{tnx_id := Id, node := Node} <- Status],
case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of
[{TnxId, _}] ->
{same_tnx_id, TnxId};
[{_TnxId, Target} | _] ->
{ok, Target}
end.
changed(K, V, Conf) ->
case maps:find(K, Conf) of
{ok, V1} when V =:= V1 -> false;
_ -> {true, K}
end.
find_running_confs() ->
lists:map(
fun(Node) ->
Conf = emqx_conf_proto_v3:get_config(Node, []),
{Node, maps:without(?READONLY_KEYS, Conf)}
end,
mria:running_nodes()
).
print_inconsistent_conf(Keys, Target, Status, AllConfs) ->
{value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs),
TargetTnxId = get_tnx_id(Target, Status),
lists:foreach(
fun(Key) ->
lists:foreach(
fun({Node, OtherConf}) ->
TargetV = maps:get(Key, TargetConf),
PrevV = maps:get(Key, OtherConf),
NodeTnxId = get_tnx_id(Node, Status),
Options = #{
key => Key,
node => {Node, NodeTnxId},
target => {Target, TargetTnxId}
},
print_inconsistent_conf(TargetV, PrevV, Options)
end,
OtherConfs
)
end,
Keys
).
get_tnx_id(Node, Status) ->
case lists:filter(fun(#{node := Node0}) -> Node0 =:= Node end, Status) of
[] -> 0;
[#{tnx_id := TnxId}] -> TnxId
end.
print_inconsistent_conf(SameConf, SameConf, _Options) ->
ok;
print_inconsistent_conf(New = #{}, Old = #{}, Options) ->
#{
added := Added,
removed := Removed,
changed := Changed
} = emqx_utils_maps:diff_maps(New, Old),
RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n",
print_inconsistent(Removed, RemovedFmt, Options),
AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n",
print_inconsistent(Added, AddedFmt, Options),
ChangedFmt =
"~ts(~w)'s ~s has been updated, but the changes have not been applied to ~ts(~w).~n",
print_inconsistent(Changed, ChangedFmt, Options);
%% authentication rewrite topic_metrics is list(not map).
print_inconsistent_conf(New, Old, Options) ->
#{
key := Key,
target := {Target, TargetTnxId},
node := {Node, NodeTnxId}
} = Options,
emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [
Node, NodeTnxId, Key, Target, TargetTnxId
]),
print_hocon(#{Node => Old, Target => New}).
print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} ->
#{
key := Key,
target := {Target, TargetTnxId},
node := {Node, NodeTnxId}
} = Options,
emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]),
NodeRawConf = emqx_conf_proto_v3:get_raw_config(Node, [Key]),
TargetRawConf = emqx_conf_proto_v3:get_raw_config(Target, [Key]),
{TargetConf, NodeConf} =
maps:fold(
fun(SubKey, _, {NewAcc, OldAcc}) ->
SubNew0 = maps:get(atom_to_binary(SubKey), NodeRawConf, undefined),
SubOld0 = maps:get(atom_to_binary(SubKey), TargetRawConf, undefined),
{SubNew1, SubOld1} = remove_identical_value(SubNew0, SubOld0),
{NewAcc#{SubKey => SubNew1}, OldAcc#{SubKey => SubOld1}}
end,
{#{}, #{}},
Conf
),
%% zones.default is a virtual zone. It will be changed when mqtt changes,
%% so we can't retrieve the raw data for zones.default(always undefined).
case TargetConf =:= NodeConf of
true -> ok;
false -> print_hocon(#{Target => #{Key => TargetConf}, Node => #{Key => NodeConf}})
end;
print_inconsistent(_Conf, _Format, _Options) ->
ok.
remove_identical_value(New = #{}, Old = #{}) ->
maps:fold(
fun(K, NewV, {Acc1, Acc2}) ->
case maps:find(K, Old) of
{ok, NewV} ->
{maps:remove(K, Acc1), maps:remove(K, Acc2)};
{ok, OldV} ->
{NewV1, OldV1} = remove_identical_value(NewV, OldV),
{maps:put(K, NewV1, Acc1), maps:put(K, OldV1, Acc2)}
end
end,
{New, Old},
New
);
remove_identical_value(New, Old) ->
{New, Old}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
find_inconsistent_test() ->
Status = [
#{node => 'node1', tnx_id => 1},
#{node => 'node2', tnx_id => 3},
#{node => 'node3', tnx_id => 2}
],
Stats = #{<<"enable">> => false},
%% chose the highest tnx_id node
Mqtt = #{
<<"await_rel_timeout">> => <<"300s">>,
<<"exclusive_subscription">> => false,
<<"idle_timeout">> => <<"15s">>
},
TargetMqtt1 = Mqtt#{<<"idle_timeout">> => <<"16s">>},
Confs0 = [
{node1, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"11s">>}, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"17s">>}, <<"stats">> => Stats}},
{node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}}
],
?assertEqual(
{inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0)
),
%% conf is the same, no changed
NoDiffConfs = [
{node1, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}
],
?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)),
%% same tnx_id
SameStatus = [
#{node => 'node1', tnx_id => 3},
#{node => 'node2', tnx_id => 3},
#{node => 'node3', tnx_id => 3}
],
%% same conf
?assertEqual(
{error, <<"All configuration has already been synchronized(3) successfully\n">>},
find_inconsistent(SameStatus, NoDiffConfs)
),
%% diff conf same tnx_id use the first one
?assertEqual(
{inconsistent_key, 3, [<<"mqtt">>]},
find_inconsistent(SameStatus, Confs0)
),
ok.
-endif.

View File

@ -37,6 +37,7 @@
]). ]).
-export([get_hocon_config/1, get_hocon_config/2]). -export([get_hocon_config/1, get_hocon_config/2]).
-export([get_raw_config/2]).
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
@ -114,6 +115,10 @@ get_override_config_file(Nodes) ->
get_hocon_config(Node) -> get_hocon_config(Node) ->
rpc:call(Node, emqx_conf_cli, get_config, []). rpc:call(Node, emqx_conf_cli, get_config, []).
-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}.
get_raw_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_raw_config, [KeyPath]).
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. -spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
get_hocon_config(Node, Key) -> get_hocon_config(Node, Key) ->
rpc:call(Node, emqx_conf_cli, get_config, [Key]). rpc:call(Node, emqx_conf_cli, get_config, [Key]).

View File

@ -141,12 +141,13 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
Res1 = gen_server:call(?NODE2, Call), Res1 = gen_server:call(?NODE2, Call),
Res2 = gen_server:call(?NODE3, Call), Res2 = gen_server:call(?NODE3, Call),
%% Node2 is retry on tnx_id 1, and should not run Next MFA. %% Node2 is retry on tnx_id 1, and should not run Next MFA.
?assertEqual( ?assertMatch(
{init_failure, #{ {init_failure, #{
msg => stale_view_of_cluster_state, msg := stale_view_of_cluster_state,
retry_times => 2, retry_times := 2,
cluster_tnx_id => 2, cluster_tnx_id := 2,
node_tnx_id => 1 node_tnx_id := 1,
suggested := _
}}, }},
Res1 Res1
), ),

View File

@ -44,7 +44,8 @@
pem_cache/1, pem_cache/1,
olp/1, olp/1,
data/1, data/1,
ds/1 ds/1,
cluster_info/0
]). ]).
-spec load() -> ok. -spec load() -> ok.
@ -53,7 +54,7 @@ load() ->
lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
is_cmd(Fun) -> is_cmd(Fun) ->
not lists:member(Fun, [init, load, module_info]). not lists:member(Fun, [init, load, module_info, cluster_info]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Node status %% @doc Node status