diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index e42505b49..a2c0a2478 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -501,11 +501,12 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> stale_view_of_cluster_msg(Meta, Count) -> Reason = Meta#{ - msg => stale_view_of_cluster_state, - retry_times => Count + msg => stale_view_of_cluster, + retry_times => Count, + suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" }, ?SLOG(warning, Reason), - Reason. + {error, Reason}. %% The entry point of a config change transaction. init_mfa(Node, MFA) -> diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl index 5b2552ad9..c4ea64aa6 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl @@ -94,6 +94,9 @@ del_stale_mfa(MaxHistory) -> ), delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). +%% Do nothing when cluster_rpc_commit is empty. +delete_stale_mfa(_, infinity, _Count) -> + ok; delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index f2b9d63b0..6c5f519bb 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -18,6 +18,7 @@ -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). -export([ load/0, @@ -38,6 +39,8 @@ -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). +-define(TIMEOUT, 30000). + -dialyzer({no_match, [load/0]}). @@ -90,13 +93,35 @@ admins(["skip", Node0]) -> Node = list_to_existing_atom(Node0), emqx_cluster_rpc:skip_failed_commit(Node), status(); -admins(["tnxid", TnxId0]) -> - %% changed to 'inspect' in 5.6 - %% TODO: delete this clause in 5.7 - admins(["inspect", TnxId0]); admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); +admins(["fix"]) -> + {atomic, Status} = emqx_cluster_rpc:status(), + %% find inconsistent in conf, but fix in raw way. + %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + AllConfs = find_running_confs(), + case find_inconsistent(Status, AllConfs) of + {inconsistent_tnx_id_key, Target, InconsistentKeys} -> + fix_inconsistent_with_raw(Target, InconsistentKeys); + inconsistent_tnx_id -> + print_tnx_id_status(Status), + ok = emqx_cluster_rpc:reset(), + emqx_ctl:print("Reset tnxid to 0 successfully~n"); + {inconsistent_key, TnxId, InconsistentKeys} -> + [{Target, _} | _] = AllConfs, + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning( + "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + [InconsistentKeys] + ), + emqx_ctl:warning("This is normal. This fix will not make any changes.~n"); + {error, Reason} -> + emqx_ctl:print(Reason) + end, + StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]); admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -118,6 +143,14 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). +fix_inconsistent_with_raw(Node, Keys) -> + Confs = [#{Key => emqx_conf_proto_v3:get_raw_config(Node, Key)} || Key <- Keys], + ok = emqx_cluster_rpc:reset(), + case load_config_from_raw(Confs, #{mode => replace}) of + ok -> waiting_for_fix_finish(); + Error -> Error + end. + audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). @@ -159,29 +192,52 @@ usage_sync() -> "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fast_forward [node] ", "Fast-forward config change to the given commit ID on the given node.\n" - "WARNING: This results in inconsistent configs among the clustered nodes."} + "WARNING: This results in inconsistent configs among the clustered nodes."}, + {"conf cluster_sync fix", + "Sync the node with the most comprehensive configuration to other node.\n" + "WARNING: typically the one with the highest tnxid."} ]. status() -> - emqx_ctl:print("-----------------------------------------------\n"), {atomic, Status} = emqx_cluster_rpc:status(), - lists:foreach( - fun(S) -> - #{ - node := Node, - tnx_id := TnxId, - mfa := {M, F, A}, - created_at := CreatedAt - } = S, + status(Status). + +status(Status) -> + emqx_ctl:print("-----------------------------------------------\n"), + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + AllConfs = find_running_confs(), + case find_inconsistent(Status, AllConfs) of + {inconsistent_tnx_id_key, TargetNode, InconsistentKeys} -> + emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), + print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs); + inconsistent_tnx_id -> + print_tnx_id_status(Status), emqx_ctl:print( - "~p:[~w] CreatedAt:~p ~p:~p/~w\n", - [Node, TnxId, CreatedAt, M, F, length(A)] - ) - end, - Status - ), + "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" + ); + {inconsistent_key, TnxId, InconsistentKeys} -> + [{Target, _} | _] = AllConfs, + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning( + "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + [InconsistentKeys] + ), + emqx_ctl:warning( + "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" + " is allowed but not recommended.~n" + ); + {error, Reason} -> + emqx_ctl:print(Reason) + end, + StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print("-----------------------------------------------\n"). +print_tnx_id_status(List0) -> + emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), + List1 = lists:map(fun(#{node := Node, tnx_id := TnxId}) -> {Node, TnxId} end, List0), + emqx_ctl:print("~p~n", [List1]). + print_keys(Keys) -> SortKeys = lists:sort(Keys), emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). @@ -500,8 +556,7 @@ filter_readonly_config(Raw) -> try RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), - ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], - {ok, maps:without(ReadOnlyKeys, Raw)} + {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)} catch throw:Error -> ?SLOG(error, #{ @@ -588,3 +643,240 @@ warning(_, Format, Args) -> emqx_ctl:warning(Format, Args). print(#{log := none}, _, _) -> ok; print(_, Format, Args) -> emqx_ctl:print(Format, Args). + +waiting_for_fix_finish() -> + timer:sleep(1000), + waiting_for_sync_finish(1). + +waiting_for_sync_finish(10) -> + emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n"); +waiting_for_sync_finish(Sec) -> + {atomic, Status} = emqx_cluster_rpc:status(), + case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of + [_] -> + emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); + _ -> + Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), + emqx_ctl:warning("sync: ~n", [Res]), + timer:sleep(1000), + waiting_for_sync_finish(Sec + 1) + end. + +find_inconsistent(Status, AllConfs) -> + case find_highest_node(Status) of + {same_tnx_id, TnxId} -> + %% check the conf is the same or not + [{_, TargetConf} | OtherConfs] = AllConfs, + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> + Msg = + <<"All configuration has already been synchronized(", + (integer_to_binary(TnxId))/binary, ") successfully\n">>, + {error, Msg}; + InconsistentKeys -> + {inconsistent_key, TnxId, InconsistentKeys} + end; + {ok, Target} -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> inconsistent_tnx_id; + ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys} + end + end. + +find_inconsistent_key(TargetConf, OtherConfs) -> + lists:usort( + lists:foldl( + fun({_Node, OtherConf}, Changed) -> + lists:filtermap( + fun({K, V}) -> changed(K, V, TargetConf) end, + maps:to_list(OtherConf) + ) ++ Changed + end, + [], + OtherConfs + ) + ). + +find_highest_node([]) -> + {same_tnx_id, 0}; +find_highest_node(Status) -> + Ids = [{Id, Node} || #{tnx_id := Id, node := Node} <- Status], + case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of + [{TnxId, _}] -> + {same_tnx_id, TnxId}; + [{_TnxId, Target} | _] -> + {ok, Target} + end. + +changed(K, V, Conf) -> + case maps:find(K, Conf) of + {ok, V1} when V =:= V1 -> false; + _ -> {true, K} + end. + +find_running_confs() -> + lists:map( + fun(Node) -> + Conf = emqx_conf_proto_v3:get_config(Node, []), + {Node, maps:without(?READONLY_KEYS, Conf)} + end, + mria:running_nodes() + ). + +print_inconsistent_conf(Keys, Target, Status, AllConfs) -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + TargetTnxId = get_tnx_id(Target, Status), + lists:foreach( + fun(Key) -> + lists:foreach( + fun({Node, OtherConf}) -> + TargetV = maps:get(Key, TargetConf), + PrevV = maps:get(Key, OtherConf), + NodeTnxId = get_tnx_id(Node, Status), + Options = #{ + key => Key, + node => {Node, NodeTnxId}, + target => {Target, TargetTnxId} + }, + print_inconsistent_conf(TargetV, PrevV, Options) + end, + OtherConfs + ) + end, + Keys + ). + +get_tnx_id(Node, Status) -> + case lists:filter(fun(#{node := Node0}) -> Node0 =:= Node end, Status) of + [] -> 0; + [#{tnx_id := TnxId}] -> TnxId + end. + +print_inconsistent_conf(SameConf, SameConf, _Options) -> + ok; +print_inconsistent_conf(New = #{}, Old = #{}, Options) -> + #{ + added := Added, + removed := Removed, + changed := Changed + } = emqx_utils_maps:diff_maps(New, Old), + RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n", + print_inconsistent(Removed, RemovedFmt, Options), + AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", + print_inconsistent(Added, AddedFmt, Options), + ChangedFmt = + "~ts(~w)'s ~s has been updated, but the changes have not been applied to ~ts(~w).~n", + print_inconsistent(Changed, ChangedFmt, Options); +%% authentication rewrite topic_metrics is list(not map). +print_inconsistent_conf(New, Old, Options) -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [ + Node, NodeTnxId, Key, Target, TargetTnxId + ]), + print_hocon(#{Node => Old, Target => New}). + +print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]), + NodeRawConf = emqx_conf_proto_v3:get_raw_config(Node, [Key]), + TargetRawConf = emqx_conf_proto_v3:get_raw_config(Target, [Key]), + {TargetConf, NodeConf} = + maps:fold( + fun(SubKey, _, {NewAcc, OldAcc}) -> + SubNew0 = maps:get(atom_to_binary(SubKey), NodeRawConf, undefined), + SubOld0 = maps:get(atom_to_binary(SubKey), TargetRawConf, undefined), + {SubNew1, SubOld1} = remove_identical_value(SubNew0, SubOld0), + {NewAcc#{SubKey => SubNew1}, OldAcc#{SubKey => SubOld1}} + end, + {#{}, #{}}, + Conf + ), + %% zones.default is a virtual zone. It will be changed when mqtt changes, + %% so we can't retrieve the raw data for zones.default(always undefined). + case TargetConf =:= NodeConf of + true -> ok; + false -> print_hocon(#{Target => #{Key => TargetConf}, Node => #{Key => NodeConf}}) + end; +print_inconsistent(_Conf, _Format, _Options) -> + ok. + +remove_identical_value(New = #{}, Old = #{}) -> + maps:fold( + fun(K, NewV, {Acc1, Acc2}) -> + case maps:find(K, Old) of + {ok, NewV} -> + {maps:remove(K, Acc1), maps:remove(K, Acc2)}; + {ok, OldV} -> + {NewV1, OldV1} = remove_identical_value(NewV, OldV), + {maps:put(K, NewV1, Acc1), maps:put(K, OldV1, Acc2)} + end + end, + {New, Old}, + New + ); +remove_identical_value(New, Old) -> + {New, Old}. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +find_inconsistent_test() -> + Status = [ + #{node => 'node1', tnx_id => 1}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 2} + ], + Stats = #{<<"enable">> => false}, + + %% chose the highest tnx_id node + Mqtt = #{ + <<"await_rel_timeout">> => <<"300s">>, + <<"exclusive_subscription">> => false, + <<"idle_timeout">> => <<"15s">> + }, + TargetMqtt1 = Mqtt#{<<"idle_timeout">> => <<"16s">>}, + Confs0 = [ + {node1, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"11s">>}, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"17s">>}, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} + ], + ?assertEqual( + {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0) + ), + + %% conf is the same, no changed + NoDiffConfs = [ + {node1, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} + ], + ?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)), + + %% same tnx_id + SameStatus = [ + #{node => 'node1', tnx_id => 3}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 3} + ], + %% same conf + ?assertEqual( + {error, <<"All configuration has already been synchronized(3) successfully\n">>}, + find_inconsistent(SameStatus, NoDiffConfs) + ), + %% diff conf same tnx_id use the first one + ?assertEqual( + {inconsistent_key, 3, [<<"mqtt">>]}, + find_inconsistent(SameStatus, Confs0) + ), + ok. + +-endif. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index 14d578ec1..e2e035879 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -37,6 +37,7 @@ ]). -export([get_hocon_config/1, get_hocon_config/2]). +-export([get_raw_config/2]). -include_lib("emqx/include/bpapi.hrl"). @@ -114,6 +115,10 @@ get_override_config_file(Nodes) -> get_hocon_config(Node) -> rpc:call(Node, emqx_conf_cli, get_config, []). +-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}. +get_raw_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_raw_config, [KeyPath]). + -spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. get_hocon_config(Node, Key) -> rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 97b80ab6b..bd336a619 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -141,12 +141,13 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> Res1 = gen_server:call(?NODE2, Call), Res2 = gen_server:call(?NODE3, Call), %% Node2 is retry on tnx_id 1, and should not run Next MFA. - ?assertEqual( + ?assertMatch( {init_failure, #{ - msg => stale_view_of_cluster_state, - retry_times => 2, - cluster_tnx_id => 2, - node_tnx_id => 1 + msg := stale_view_of_cluster_state, + retry_times := 2, + cluster_tnx_id := 2, + node_tnx_id := 1, + suggested := _ }}, Res1 ), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d327efe6..2742167f5 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -44,7 +44,8 @@ pem_cache/1, olp/1, data/1, - ds/1 + ds/1, + cluster_info/0 ]). -spec load() -> ok. @@ -53,7 +54,7 @@ load() -> lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). is_cmd(Fun) -> - not lists:member(Fun, [init, load, module_info]). + not lists:member(Fun, [init, load, module_info, cluster_info]). %%-------------------------------------------------------------------- %% @doc Node status