From 2069910ad10178c6451a75901f6e51defec8c7d1 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jun 2024 09:19:41 +0800 Subject: [PATCH 1/9] feat: add cluster fix command --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 7 +- .../src/emqx_cluster_rpc_cleaner.erl | 3 + apps/emqx_conf/src/emqx_conf_cli.erl | 336 ++++++++++++++++-- .../src/proto/emqx_conf_proto_v3.erl | 5 + .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 11 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 5 +- 6 files changed, 335 insertions(+), 32 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index e42505b49..a2c0a2478 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -501,11 +501,12 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> stale_view_of_cluster_msg(Meta, Count) -> Reason = Meta#{ - msg => stale_view_of_cluster_state, - retry_times => Count + msg => stale_view_of_cluster, + retry_times => Count, + suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" }, ?SLOG(warning, Reason), - Reason. + {error, Reason}. %% The entry point of a config change transaction. init_mfa(Node, MFA) -> diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl index 5b2552ad9..c4ea64aa6 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl @@ -94,6 +94,9 @@ del_stale_mfa(MaxHistory) -> ), delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). +%% Do nothing when cluster_rpc_commit is empty. +delete_stale_mfa(_, infinity, _Count) -> + ok; delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index f2b9d63b0..6c5f519bb 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -18,6 +18,7 @@ -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). -export([ load/0, @@ -38,6 +39,8 @@ -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). +-define(TIMEOUT, 30000). + -dialyzer({no_match, [load/0]}). @@ -90,13 +93,35 @@ admins(["skip", Node0]) -> Node = list_to_existing_atom(Node0), emqx_cluster_rpc:skip_failed_commit(Node), status(); -admins(["tnxid", TnxId0]) -> - %% changed to 'inspect' in 5.6 - %% TODO: delete this clause in 5.7 - admins(["inspect", TnxId0]); admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); +admins(["fix"]) -> + {atomic, Status} = emqx_cluster_rpc:status(), + %% find inconsistent in conf, but fix in raw way. + %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + AllConfs = find_running_confs(), + case find_inconsistent(Status, AllConfs) of + {inconsistent_tnx_id_key, Target, InconsistentKeys} -> + fix_inconsistent_with_raw(Target, InconsistentKeys); + inconsistent_tnx_id -> + print_tnx_id_status(Status), + ok = emqx_cluster_rpc:reset(), + emqx_ctl:print("Reset tnxid to 0 successfully~n"); + {inconsistent_key, TnxId, InconsistentKeys} -> + [{Target, _} | _] = AllConfs, + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning( + "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + [InconsistentKeys] + ), + emqx_ctl:warning("This is normal. This fix will not make any changes.~n"); + {error, Reason} -> + emqx_ctl:print(Reason) + end, + StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]); admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -118,6 +143,14 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). +fix_inconsistent_with_raw(Node, Keys) -> + Confs = [#{Key => emqx_conf_proto_v3:get_raw_config(Node, Key)} || Key <- Keys], + ok = emqx_cluster_rpc:reset(), + case load_config_from_raw(Confs, #{mode => replace}) of + ok -> waiting_for_fix_finish(); + Error -> Error + end. + audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). @@ -159,29 +192,52 @@ usage_sync() -> "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fast_forward [node] ", "Fast-forward config change to the given commit ID on the given node.\n" - "WARNING: This results in inconsistent configs among the clustered nodes."} + "WARNING: This results in inconsistent configs among the clustered nodes."}, + {"conf cluster_sync fix", + "Sync the node with the most comprehensive configuration to other node.\n" + "WARNING: typically the one with the highest tnxid."} ]. status() -> - emqx_ctl:print("-----------------------------------------------\n"), {atomic, Status} = emqx_cluster_rpc:status(), - lists:foreach( - fun(S) -> - #{ - node := Node, - tnx_id := TnxId, - mfa := {M, F, A}, - created_at := CreatedAt - } = S, + status(Status). + +status(Status) -> + emqx_ctl:print("-----------------------------------------------\n"), + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + AllConfs = find_running_confs(), + case find_inconsistent(Status, AllConfs) of + {inconsistent_tnx_id_key, TargetNode, InconsistentKeys} -> + emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), + print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs); + inconsistent_tnx_id -> + print_tnx_id_status(Status), emqx_ctl:print( - "~p:[~w] CreatedAt:~p ~p:~p/~w\n", - [Node, TnxId, CreatedAt, M, F, length(A)] - ) - end, - Status - ), + "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" + ); + {inconsistent_key, TnxId, InconsistentKeys} -> + [{Target, _} | _] = AllConfs, + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning( + "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + [InconsistentKeys] + ), + emqx_ctl:warning( + "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" + " is allowed but not recommended.~n" + ); + {error, Reason} -> + emqx_ctl:print(Reason) + end, + StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print("-----------------------------------------------\n"). +print_tnx_id_status(List0) -> + emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), + List1 = lists:map(fun(#{node := Node, tnx_id := TnxId}) -> {Node, TnxId} end, List0), + emqx_ctl:print("~p~n", [List1]). + print_keys(Keys) -> SortKeys = lists:sort(Keys), emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). @@ -500,8 +556,7 @@ filter_readonly_config(Raw) -> try RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), - ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], - {ok, maps:without(ReadOnlyKeys, Raw)} + {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)} catch throw:Error -> ?SLOG(error, #{ @@ -588,3 +643,240 @@ warning(_, Format, Args) -> emqx_ctl:warning(Format, Args). print(#{log := none}, _, _) -> ok; print(_, Format, Args) -> emqx_ctl:print(Format, Args). + +waiting_for_fix_finish() -> + timer:sleep(1000), + waiting_for_sync_finish(1). + +waiting_for_sync_finish(10) -> + emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n"); +waiting_for_sync_finish(Sec) -> + {atomic, Status} = emqx_cluster_rpc:status(), + case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of + [_] -> + emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); + _ -> + Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), + emqx_ctl:warning("sync: ~n", [Res]), + timer:sleep(1000), + waiting_for_sync_finish(Sec + 1) + end. + +find_inconsistent(Status, AllConfs) -> + case find_highest_node(Status) of + {same_tnx_id, TnxId} -> + %% check the conf is the same or not + [{_, TargetConf} | OtherConfs] = AllConfs, + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> + Msg = + <<"All configuration has already been synchronized(", + (integer_to_binary(TnxId))/binary, ") successfully\n">>, + {error, Msg}; + InconsistentKeys -> + {inconsistent_key, TnxId, InconsistentKeys} + end; + {ok, Target} -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> inconsistent_tnx_id; + ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys} + end + end. + +find_inconsistent_key(TargetConf, OtherConfs) -> + lists:usort( + lists:foldl( + fun({_Node, OtherConf}, Changed) -> + lists:filtermap( + fun({K, V}) -> changed(K, V, TargetConf) end, + maps:to_list(OtherConf) + ) ++ Changed + end, + [], + OtherConfs + ) + ). + +find_highest_node([]) -> + {same_tnx_id, 0}; +find_highest_node(Status) -> + Ids = [{Id, Node} || #{tnx_id := Id, node := Node} <- Status], + case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of + [{TnxId, _}] -> + {same_tnx_id, TnxId}; + [{_TnxId, Target} | _] -> + {ok, Target} + end. + +changed(K, V, Conf) -> + case maps:find(K, Conf) of + {ok, V1} when V =:= V1 -> false; + _ -> {true, K} + end. + +find_running_confs() -> + lists:map( + fun(Node) -> + Conf = emqx_conf_proto_v3:get_config(Node, []), + {Node, maps:without(?READONLY_KEYS, Conf)} + end, + mria:running_nodes() + ). + +print_inconsistent_conf(Keys, Target, Status, AllConfs) -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + TargetTnxId = get_tnx_id(Target, Status), + lists:foreach( + fun(Key) -> + lists:foreach( + fun({Node, OtherConf}) -> + TargetV = maps:get(Key, TargetConf), + PrevV = maps:get(Key, OtherConf), + NodeTnxId = get_tnx_id(Node, Status), + Options = #{ + key => Key, + node => {Node, NodeTnxId}, + target => {Target, TargetTnxId} + }, + print_inconsistent_conf(TargetV, PrevV, Options) + end, + OtherConfs + ) + end, + Keys + ). + +get_tnx_id(Node, Status) -> + case lists:filter(fun(#{node := Node0}) -> Node0 =:= Node end, Status) of + [] -> 0; + [#{tnx_id := TnxId}] -> TnxId + end. + +print_inconsistent_conf(SameConf, SameConf, _Options) -> + ok; +print_inconsistent_conf(New = #{}, Old = #{}, Options) -> + #{ + added := Added, + removed := Removed, + changed := Changed + } = emqx_utils_maps:diff_maps(New, Old), + RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n", + print_inconsistent(Removed, RemovedFmt, Options), + AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", + print_inconsistent(Added, AddedFmt, Options), + ChangedFmt = + "~ts(~w)'s ~s has been updated, but the changes have not been applied to ~ts(~w).~n", + print_inconsistent(Changed, ChangedFmt, Options); +%% authentication rewrite topic_metrics is list(not map). +print_inconsistent_conf(New, Old, Options) -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [ + Node, NodeTnxId, Key, Target, TargetTnxId + ]), + print_hocon(#{Node => Old, Target => New}). + +print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]), + NodeRawConf = emqx_conf_proto_v3:get_raw_config(Node, [Key]), + TargetRawConf = emqx_conf_proto_v3:get_raw_config(Target, [Key]), + {TargetConf, NodeConf} = + maps:fold( + fun(SubKey, _, {NewAcc, OldAcc}) -> + SubNew0 = maps:get(atom_to_binary(SubKey), NodeRawConf, undefined), + SubOld0 = maps:get(atom_to_binary(SubKey), TargetRawConf, undefined), + {SubNew1, SubOld1} = remove_identical_value(SubNew0, SubOld0), + {NewAcc#{SubKey => SubNew1}, OldAcc#{SubKey => SubOld1}} + end, + {#{}, #{}}, + Conf + ), + %% zones.default is a virtual zone. It will be changed when mqtt changes, + %% so we can't retrieve the raw data for zones.default(always undefined). + case TargetConf =:= NodeConf of + true -> ok; + false -> print_hocon(#{Target => #{Key => TargetConf}, Node => #{Key => NodeConf}}) + end; +print_inconsistent(_Conf, _Format, _Options) -> + ok. + +remove_identical_value(New = #{}, Old = #{}) -> + maps:fold( + fun(K, NewV, {Acc1, Acc2}) -> + case maps:find(K, Old) of + {ok, NewV} -> + {maps:remove(K, Acc1), maps:remove(K, Acc2)}; + {ok, OldV} -> + {NewV1, OldV1} = remove_identical_value(NewV, OldV), + {maps:put(K, NewV1, Acc1), maps:put(K, OldV1, Acc2)} + end + end, + {New, Old}, + New + ); +remove_identical_value(New, Old) -> + {New, Old}. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +find_inconsistent_test() -> + Status = [ + #{node => 'node1', tnx_id => 1}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 2} + ], + Stats = #{<<"enable">> => false}, + + %% chose the highest tnx_id node + Mqtt = #{ + <<"await_rel_timeout">> => <<"300s">>, + <<"exclusive_subscription">> => false, + <<"idle_timeout">> => <<"15s">> + }, + TargetMqtt1 = Mqtt#{<<"idle_timeout">> => <<"16s">>}, + Confs0 = [ + {node1, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"11s">>}, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"17s">>}, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} + ], + ?assertEqual( + {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0) + ), + + %% conf is the same, no changed + NoDiffConfs = [ + {node1, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} + ], + ?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)), + + %% same tnx_id + SameStatus = [ + #{node => 'node1', tnx_id => 3}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 3} + ], + %% same conf + ?assertEqual( + {error, <<"All configuration has already been synchronized(3) successfully\n">>}, + find_inconsistent(SameStatus, NoDiffConfs) + ), + %% diff conf same tnx_id use the first one + ?assertEqual( + {inconsistent_key, 3, [<<"mqtt">>]}, + find_inconsistent(SameStatus, Confs0) + ), + ok. + +-endif. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index 14d578ec1..e2e035879 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -37,6 +37,7 @@ ]). -export([get_hocon_config/1, get_hocon_config/2]). +-export([get_raw_config/2]). -include_lib("emqx/include/bpapi.hrl"). @@ -114,6 +115,10 @@ get_override_config_file(Nodes) -> get_hocon_config(Node) -> rpc:call(Node, emqx_conf_cli, get_config, []). +-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}. +get_raw_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_raw_config, [KeyPath]). + -spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. get_hocon_config(Node, Key) -> rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 97b80ab6b..bd336a619 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -141,12 +141,13 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> Res1 = gen_server:call(?NODE2, Call), Res2 = gen_server:call(?NODE3, Call), %% Node2 is retry on tnx_id 1, and should not run Next MFA. - ?assertEqual( + ?assertMatch( {init_failure, #{ - msg => stale_view_of_cluster_state, - retry_times => 2, - cluster_tnx_id => 2, - node_tnx_id => 1 + msg := stale_view_of_cluster_state, + retry_times := 2, + cluster_tnx_id := 2, + node_tnx_id := 1, + suggested := _ }}, Res1 ), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d327efe6..2742167f5 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -44,7 +44,8 @@ pem_cache/1, olp/1, data/1, - ds/1 + ds/1, + cluster_info/0 ]). -spec load() -> ok. @@ -53,7 +54,7 @@ load() -> lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). is_cmd(Fun) -> - not lists:member(Fun, [init, load, module_info]). + not lists:member(Fun, [init, load, module_info, cluster_info]). %%-------------------------------------------------------------------- %% @doc Node status From 3ed4340145ba93ca266590c31a10e582cbc998a4 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 10 Jun 2024 22:19:55 +0800 Subject: [PATCH 2/9] test: fix cluster_rpc test failed --- apps/emqx_conf/src/emqx_conf_cli.erl | 3 ++- apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 15 ++++++++------- changes/ce/feat-13202.en.md | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) create mode 100644 changes/ce/feat-13202.en.md diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 6c5f519bb..714198fef 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -104,7 +104,8 @@ admins(["fix"]) -> AllConfs = find_running_confs(), case find_inconsistent(Status, AllConfs) of {inconsistent_tnx_id_key, Target, InconsistentKeys} -> - fix_inconsistent_with_raw(Target, InconsistentKeys); + _ = fix_inconsistent_with_raw(Target, InconsistentKeys), + ok; inconsistent_tnx_id -> print_tnx_id_status(Status), ok = emqx_cluster_rpc:reset(), diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index bd336a619..4327ef92c 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -142,13 +142,14 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> Res2 = gen_server:call(?NODE3, Call), %% Node2 is retry on tnx_id 1, and should not run Next MFA. ?assertMatch( - {init_failure, #{ - msg := stale_view_of_cluster_state, - retry_times := 2, - cluster_tnx_id := 2, - node_tnx_id := 1, - suggested := _ - }}, + {init_failure, + {error, #{ + msg := stale_view_of_cluster_state, + retry_times := 2, + cluster_tnx_id := 2, + node_tnx_id := 1, + suggested := _ + }}}, Res1 ), ?assertEqual(Res1, Res2), diff --git a/changes/ce/feat-13202.en.md b/changes/ce/feat-13202.en.md new file mode 100644 index 000000000..6871f5acc --- /dev/null +++ b/changes/ce/feat-13202.en.md @@ -0,0 +1 @@ +Introduce `emqx_cli conf cluster_sync fix` to address cluster inconsistencies. It synchronizes the configuration of the node with the largest `tnx_id` to all nodes. From 5b105fcdbbcd855a7486858f9df14931bb01e7f2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 11 Jun 2024 11:33:13 +0800 Subject: [PATCH 3/9] chore: move emqx_conf_proto_v3 to emqx_conf_proto_v4 --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_conf/src/emqx_conf.erl | 18 +-- apps/emqx_conf/src/emqx_conf_app.erl | 4 +- apps/emqx_conf/src/emqx_conf_cli.erl | 8 +- .../src/proto/emqx_conf_proto_v3.erl | 5 - .../src/proto/emqx_conf_proto_v4.erl | 124 ++++++++++++++++++ .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 2 +- .../src/emqx_mgmt_api_configs.erl | 4 +- .../test/emqx_mgmt_api_configs_SUITE.erl | 4 +- 9 files changed, 145 insertions(+), 25 deletions(-) create mode 100644 apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 7c78d43d9..b80468104 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -16,6 +16,7 @@ {emqx_conf,1}. {emqx_conf,2}. {emqx_conf,3}. +{emqx_conf,4}. {emqx_connector,1}. {emqx_dashboard,1}. {emqx_delayed,1}. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 9ac4298bb..0749559b3 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -64,7 +64,7 @@ get_raw(KeyPath) -> %% @doc Returns all values in the cluster. -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> - {ResL, []} = emqx_conf_proto_v3:get_all(KeyPath), + {ResL, []} = emqx_conf_proto_v4:get_all(KeyPath), maps:from_list(ResL). %% @doc Returns the specified node's KeyPath, or exception if not found @@ -72,14 +72,14 @@ get_all(KeyPath) -> get_by_node(Node, KeyPath) when Node =:= node() -> emqx:get_config(KeyPath); get_by_node(Node, KeyPath) -> - emqx_conf_proto_v3:get_config(Node, KeyPath). + emqx_conf_proto_v4:get_config(Node, KeyPath). %% @doc Returns the specified node's KeyPath, or the default value if not found -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term(). get_by_node(Node, KeyPath, Default) when Node =:= node() -> emqx:get_config(KeyPath, Default); get_by_node(Node, KeyPath, Default) -> - emqx_conf_proto_v3:get_config(Node, KeyPath, Default). + emqx_conf_proto_v4:get_config(Node, KeyPath, Default). %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term(). @@ -94,7 +94,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts). + emqx_conf_proto_v4:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -107,7 +107,7 @@ update(KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); update(Node, KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts). + emqx_conf_proto_v4:update(Node, KeyPath, UpdateReq, Opts). %% @doc Mark the specified key path as tombstone tombstone(KeyPath, Opts) -> @@ -117,7 +117,7 @@ tombstone(KeyPath, Opts) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - emqx_conf_proto_v3:remove_config(KeyPath, Opts). + emqx_conf_proto_v4:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -125,13 +125,13 @@ remove(KeyPath, Opts) -> remove(Node, KeyPath, Opts) when Node =:= node() -> emqx:remove_config(KeyPath, Opts#{override_to => local}); remove(Node, KeyPath, Opts) -> - emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts). + emqx_conf_proto_v4:remove_config(Node, KeyPath, Opts). %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - emqx_conf_proto_v3:reset(KeyPath, Opts). + emqx_conf_proto_v4:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -139,7 +139,7 @@ reset(KeyPath, Opts) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - emqx_conf_proto_v3:reset(Node, KeyPath, Opts). + emqx_conf_proto_v4:reset(Node, KeyPath, Opts). %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 75e106c54..e6bdb5487 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -139,7 +139,7 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> - {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), + {Results, Failed} = emqx_conf_proto_v4:get_override_config_file(Nodes), {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), LogData = #{peer_nodes => Nodes, self_node => node()}, case Failed ++ NotReady of @@ -300,7 +300,7 @@ conf_sort({ok, _}, {ok, _}) -> false. sync_data_from_node(Node) -> - case emqx_conf_proto_v3:sync_data_from_node(Node) of + case emqx_conf_proto_v4:sync_data_from_node(Node) of {ok, DataBin} -> case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of {ok, []} -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 714198fef..09bf3ca3d 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -145,7 +145,7 @@ admins(_) -> emqx_ctl:usage(usage_sync()). fix_inconsistent_with_raw(Node, Keys) -> - Confs = [#{Key => emqx_conf_proto_v3:get_raw_config(Node, Key)} || Key <- Keys], + Confs = [#{Key => emqx_conf_proto_v4:get_raw_config(Node, Key)} || Key <- Keys], ok = emqx_cluster_rpc:reset(), case load_config_from_raw(Confs, #{mode => replace}) of ok -> waiting_for_fix_finish(); @@ -719,7 +719,7 @@ changed(K, V, Conf) -> find_running_confs() -> lists:map( fun(Node) -> - Conf = emqx_conf_proto_v3:get_config(Node, []), + Conf = emqx_conf_proto_v4:get_config(Node, []), {Node, maps:without(?READONLY_KEYS, Conf)} end, mria:running_nodes() @@ -788,8 +788,8 @@ print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> node := {Node, NodeTnxId} } = Options, emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]), - NodeRawConf = emqx_conf_proto_v3:get_raw_config(Node, [Key]), - TargetRawConf = emqx_conf_proto_v3:get_raw_config(Target, [Key]), + NodeRawConf = emqx_conf_proto_v4:get_raw_config(Node, [Key]), + TargetRawConf = emqx_conf_proto_v4:get_raw_config(Target, [Key]), {TargetConf, NodeConf} = maps:fold( fun(SubKey, _, {NewAcc, OldAcc}) -> diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index e2e035879..14d578ec1 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -37,7 +37,6 @@ ]). -export([get_hocon_config/1, get_hocon_config/2]). --export([get_raw_config/2]). -include_lib("emqx/include/bpapi.hrl"). @@ -115,10 +114,6 @@ get_override_config_file(Nodes) -> get_hocon_config(Node) -> rpc:call(Node, emqx_conf_cli, get_config, []). --spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}. -get_raw_config(Node, KeyPath) -> - rpc:call(Node, emqx, get_raw_config, [KeyPath]). - -spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. get_hocon_config(Node, Key) -> rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl new file mode 100644 index 000000000..8810ef8fd --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl @@ -0,0 +1,124 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v4). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + sync_data_from_node/1, + get_config/2, + get_config/3, + get_all/1, + + update/3, + update/4, + remove_config/2, + remove_config/3, + + reset/2, + reset/3, + + get_override_config_file/1 +]). + +-export([get_hocon_config/1, get_hocon_config/2]). +-export([get_raw_config/2]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.1". + +-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +sync_data_from_node(Node) -> + rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). +-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. + +-spec get_config(node(), emqx_utils_maps:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update( + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update( + node(), + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). + +-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). +get_override_config_file(Nodes) -> + rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). + +-spec get_hocon_config(node()) -> map() | {badrpc, _}. +get_hocon_config(Node) -> + rpc:call(Node, emqx_conf_cli, get_config, []). + +-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}. +get_raw_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_raw_config, [KeyPath]). + +-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. +get_hocon_config(Node, Key) -> + rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 4327ef92c..c2a4ca404 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -144,7 +144,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> ?assertMatch( {init_failure, {error, #{ - msg := stale_view_of_cluster_state, + msg := stale_view_of_cluster, retry_times := 2, cluster_tnx_id := 2, node_tnx_id := 1, diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 56e1f9bc1..b31140be8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -428,9 +428,9 @@ get_configs_v2(QueryStr) -> Conf = case maps:find(<<"key">>, QueryStr) of error -> - emqx_conf_proto_v3:get_hocon_config(Node); + emqx_conf_proto_v4:get_hocon_config(Node); {ok, Key} -> - emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key)) + emqx_conf_proto_v4:get_hocon_config(Node, atom_to_binary(Key)) end, { 200, diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 10906183b..066d4a45d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -240,11 +240,11 @@ t_configs_node({'init', Config}) -> (bad_node, _) -> {badrpc, bad} end, meck:expect(emqx_management_proto_v5, get_full_config, F), - meck:expect(emqx_conf_proto_v3, get_hocon_config, F2), + meck:expect(emqx_conf_proto_v4, get_hocon_config, F2), meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]); + meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v4, hocon_pp]); t_configs_node(_) -> Node = atom_to_list(node()), From 22fc3c49cc30d6a470c4f7d4494e784fafbe6a74 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 11 Jun 2024 22:19:54 +0800 Subject: [PATCH 4/9] chore: combine some common code into one function --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 2 +- apps/emqx_conf/src/emqx_conf_cli.erl | 87 +++++++++---------- .../src/proto/emqx_conf_proto_v3.erl | 4 + 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index a2c0a2478..6c90cf829 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -503,7 +503,7 @@ stale_view_of_cluster_msg(Meta, Count) -> Reason = Meta#{ msg => stale_view_of_cluster, retry_times => Count, - suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" + suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time" }, ?SLOG(warning, Reason), {error, Reason}. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 09bf3ca3d..e873804de 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -41,7 +41,6 @@ -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). -define(TIMEOUT, 30000). - -dialyzer({no_match, [load/0]}). load() -> @@ -97,32 +96,16 @@ admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); admins(["fix"]) -> - {atomic, Status} = emqx_cluster_rpc:status(), - %% find inconsistent in conf, but fix in raw way. - %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) - #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - AllConfs = find_running_confs(), - case find_inconsistent(Status, AllConfs) of - {inconsistent_tnx_id_key, Target, InconsistentKeys} -> - _ = fix_inconsistent_with_raw(Target, InconsistentKeys), - ok; - inconsistent_tnx_id -> - print_tnx_id_status(Status), - ok = emqx_cluster_rpc:reset(), - emqx_ctl:print("Reset tnxid to 0 successfully~n"); - {inconsistent_key, TnxId, InconsistentKeys} -> - [{Target, _} | _] = AllConfs, - print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), - emqx_ctl:warning( - "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", - [InconsistentKeys] - ), - emqx_ctl:warning("This is normal. This fix will not make any changes.~n"); - {error, Reason} -> - emqx_ctl:print(Reason) - end, - StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]); + case mria_rlog:role() of + core -> + {atomic, Status} = emqx_cluster_rpc:status(), + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_inconsistent(Status, #{fix => true}), + StoppedNodes =/= [] andalso + emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); + Role -> + emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role]) + end; admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -206,33 +189,47 @@ status() -> status(Status) -> emqx_ctl:print("-----------------------------------------------\n"), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_inconsistent(Status, #{fix => false}), + StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), + emqx_ctl:print("-----------------------------------------------\n"). + +maybe_fix_inconsistent(Status, #{fix := Fix}) -> + %% find inconsistent in conf, but fix in raw way. + %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), case find_inconsistent(Status, AllConfs) of - {inconsistent_tnx_id_key, TargetNode, InconsistentKeys} -> + {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> + _ = fix_inconsistent_with_raw(Target, InconsistentKeys), + ok; + {inconsistent_tnx_id_key, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), - print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs); + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); + inconsistent_tnx_id when Fix -> + print_tnx_id_status(Status), + ok = emqx_cluster_rpc:reset(), + emqx_ctl:print("Reset tnxid to 0 successfully~n"); inconsistent_tnx_id -> print_tnx_id_status(Status), - emqx_ctl:print( - "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" - ); + emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time"); {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [ + TnxId + ]), emqx_ctl:warning( - "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n", [InconsistentKeys] ), emqx_ctl:warning( "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" - " is allowed but not recommended.~n" - ); - {error, Reason} -> - emqx_ctl:print(Reason) - end, - StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]), - emqx_ctl:print("-----------------------------------------------\n"). + " is allowed but not recommended. " + ), + Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"), + ok; + {consistent, Msg} -> + emqx_ctl:print(Msg) + end. print_tnx_id_status(List0) -> emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), @@ -658,7 +655,7 @@ waiting_for_sync_finish(Sec) -> emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); _ -> Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), - emqx_ctl:warning("sync: ~n", [Res]), + emqx_ctl:warning("sync status: ~p~n", [Res]), timer:sleep(1000), waiting_for_sync_finish(Sec + 1) end. @@ -673,7 +670,7 @@ find_inconsistent(Status, AllConfs) -> Msg = <<"All configuration has already been synchronized(", (integer_to_binary(TnxId))/binary, ") successfully\n">>, - {error, Msg}; + {consistent, Msg}; InconsistentKeys -> {inconsistent_key, TnxId, InconsistentKeys} end; @@ -762,7 +759,7 @@ print_inconsistent_conf(New = #{}, Old = #{}, Options) -> removed := Removed, changed := Changed } = emqx_utils_maps:diff_maps(New, Old), - RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n", + RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they are still present on ~ts(~w).~n", print_inconsistent(Removed, RemovedFmt, Options), AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", print_inconsistent(Added, AddedFmt, Options), @@ -870,7 +867,7 @@ find_inconsistent_test() -> ], %% same conf ?assertEqual( - {error, <<"All configuration has already been synchronized(3) successfully\n">>}, + {consistent, <<"All configuration has already been synchronized(3) successfully\n">>}, find_inconsistent(SameStatus, NoDiffConfs) ), %% diff conf same tnx_id use the first one diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index 14d578ec1..8e5421d57 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, sync_data_from_node/1, get_config/2, get_config/3, @@ -43,6 +44,9 @@ introduced_in() -> "5.1.1". +deprecated_since() -> + "5.7.1". + -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). sync_data_from_node(Node) -> rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). From bdf3fc63a636258becd9983ec0f34b3127cededb Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 17 Jun 2024 11:40:44 +0800 Subject: [PATCH 5/9] chore: add config leader to suggestion --- apps/emqx_conf/include/emqx_conf.hrl | 11 ++++++++ apps/emqx_conf/src/emqx_cluster_rpc.erl | 23 +++++++++++++-- apps/emqx_conf/src/emqx_conf_cli.erl | 28 +++++++++++++++---- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 2 +- .../src/emqx_mgmt_api_configs.erl | 14 +++++----- 5 files changed, 62 insertions(+), 16 deletions(-) diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 042bf8d3c..10ac2b929 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -35,6 +35,17 @@ tnx_id :: pos_integer() | '$1' }). +-define(SUGGESTION(Node), + lists:flatten( + io_lib:format( + "run `./bin/emqx_ctl conf cluster_sync fix`" + " on ~p(config leader) to force sync the configs," + "when this node is lagging for more than 3 minutes,", + [Node] + ) + ) +). + -define(READONLY_KEYS, [cluster, rpc, node]). -endif. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 6c90cf829..eb605f977 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -28,6 +28,7 @@ reset/0, status/0, is_initiator/1, + find_leader/0, skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, @@ -227,6 +228,17 @@ status() -> is_initiator(Opts) -> ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE). +find_leader() -> + {atomic, Status} = status(), + case Status of + [#{node := N} | _] -> + N; + [] -> + %% running nodes already sort. + [N | _] = emqx:running_nodes(), + N + end. + %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. on_leave_clean() -> on_leave_clean(node()). @@ -500,10 +512,11 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> end. stale_view_of_cluster_msg(Meta, Count) -> + Node = find_leader(), Reason = Meta#{ msg => stale_view_of_cluster, retry_times => Count, - suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time" + suggestion => ?SUGGESTION(Node) }, ?SLOG(warning, Reason), {error, Reason}. @@ -537,7 +550,7 @@ transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). trans_status() -> - mnesia:foldl( + List = mnesia:foldl( fun(Rec, Acc) -> #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, case mnesia:read(?CLUSTER_MFA, TnxId) of @@ -560,6 +573,12 @@ trans_status() -> end, [], ?CLUSTER_COMMIT + ), + lists:sort( + fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) -> + {IdA, NA} > {IdB, NB} + end, + List ). trans_query(TnxId) -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index e873804de..7d044a36b 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -96,15 +96,23 @@ admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); admins(["fix"]) -> + {atomic, Status} = emqx_cluster_rpc:status(), case mria_rlog:role() of core -> - {atomic, Status} = emqx_cluster_rpc:status(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), maybe_fix_inconsistent(Status, #{fix => true}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> - emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role]) + Core = + case find_highest_node(Status) of + {same_tnx_id, _TnxId} -> + {ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD), + Node; + {ok, Node} -> + Node + end, + emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role]) end; admins(["fast_forward"]) -> status(), @@ -128,7 +136,14 @@ admins(_) -> emqx_ctl:usage(usage_sync()). fix_inconsistent_with_raw(Node, Keys) -> - Confs = [#{Key => emqx_conf_proto_v4:get_raw_config(Node, Key)} || Key <- Keys], + Confs = lists:foldl( + fun(Key, Acc) -> + KeyRaw = atom_to_binary(Key), + Acc#{KeyRaw => emqx_conf_proto_v4:get_raw_config(Node, [Key])} + end, + #{}, + Keys + ), ok = emqx_cluster_rpc:reset(), case load_config_from_raw(Confs, #{mode => replace}) of ok -> waiting_for_fix_finish(); @@ -179,7 +194,7 @@ usage_sync() -> "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fix", "Sync the node with the most comprehensive configuration to other node.\n" - "WARNING: typically the one with the highest tnxid."} + "WARNING: typically the config leader(with the highest tnxid)."} ]. status() -> @@ -210,7 +225,8 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> emqx_ctl:print("Reset tnxid to 0 successfully~n"); inconsistent_tnx_id -> print_tnx_id_status(Status), - emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time"); + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print(?SUGGESTION(Leader)); {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), @@ -223,7 +239,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> ), emqx_ctl:warning( "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" - " is allowed but not recommended. " + " is allowed but not recommended.~n" ), Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"), ok; diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index c2a4ca404..5dd005c02 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -148,7 +148,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> retry_times := 2, cluster_tnx_id := 2, node_tnx_id := 1, - suggested := _ + suggestion := _ }}}, Res1 ), diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index b31140be8..f2c74968e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -314,15 +314,15 @@ global_zone_configs(get, _Params, _Req) -> {200, get_zones()}; global_zone_configs(put, #{body := Body}, _Req) -> PrevZones = get_zones(), - Res = + {Res, Error} = maps:fold( - fun(Path, Value, Acc) -> + fun(Path, Value, {Acc, Error}) -> PrevValue = maps:get(Path, PrevZones), case Value =/= PrevValue of true -> case emqx_conf:update([Path], Value, ?OPTS) of {ok, #{raw_config := RawConf}} -> - Acc#{Path => RawConf}; + {Acc#{Path => RawConf}, Error}; {error, Reason} -> ?SLOG(error, #{ msg => "update_global_zone_failed", @@ -330,18 +330,18 @@ global_zone_configs(put, #{body := Body}, _Req) -> path => Path, value => Value }), - Acc + {Acc, Error#{Path => Reason}} end; false -> - Acc#{Path => Value} + {Acc#{Path => Value}, Error} end end, - #{}, + {#{}, #{}}, Body ), case maps:size(Res) =:= maps:size(Body) of true -> {200, Res}; - false -> {400, #{code => 'UPDATE_FAILED'}} + false -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Error)}} end. config_reset(post, _Params, Req) -> From 298211d1016aeda916b441dcc0491f0a876f063b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 4 Jul 2024 15:20:11 +0800 Subject: [PATCH 6/9] chore: apply suggestions from code review Co-authored-by: zmstone --- apps/emqx_conf/include/emqx_conf.hrl | 4 +-- apps/emqx_conf/src/emqx_conf_cli.erl | 40 ++++++++++++++++------------ 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 10ac2b929..53f309856 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -39,8 +39,8 @@ lists:flatten( io_lib:format( "run `./bin/emqx_ctl conf cluster_sync fix`" - " on ~p(config leader) to force sync the configs," - "when this node is lagging for more than 3 minutes,", + " on ~p(config leader) to force sync the configs, " + "if this node has been lagging for more than 3 minutes.", [Node] ) ) diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 7d044a36b..76ac83f16 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -100,7 +100,7 @@ admins(["fix"]) -> case mria_rlog:role() of core -> #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - maybe_fix_inconsistent(Status, #{fix => true}), + maybe_fix_lagging(Status, #{fix => true}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> @@ -135,7 +135,7 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). -fix_inconsistent_with_raw(Node, Keys) -> +fix_lagging_with_raw(Node, Keys) -> Confs = lists:foldl( fun(Key, Acc) -> KeyRaw = atom_to_binary(Key), @@ -204,17 +204,17 @@ status() -> status(Status) -> emqx_ctl:print("-----------------------------------------------\n"), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - maybe_fix_inconsistent(Status, #{fix => false}), + maybe_fix_lagging(Status, #{fix => false}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print("-----------------------------------------------\n"). -maybe_fix_inconsistent(Status, #{fix := Fix}) -> +maybe_fix_lagging(Status, #{fix := Fix}) -> %% find inconsistent in conf, but fix in raw way. %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), - case find_inconsistent(Status, AllConfs) of + case find_lagging(Status, AllConfs) of {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> - _ = fix_inconsistent_with_raw(Target, InconsistentKeys), + _ = fix_lagging_with_raw(Target, InconsistentKeys), ok; {inconsistent_tnx_id_key, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), @@ -230,7 +230,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [ + emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [ TnxId ]), emqx_ctl:warning( @@ -663,7 +663,10 @@ waiting_for_fix_finish() -> waiting_for_sync_finish(1). waiting_for_sync_finish(10) -> - emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n"); + emqx_ctl:warning( + "Config is still not in sync after 10s.~n" + "It may need more time, and check the logs in the lagging nodes.~n" + ); waiting_for_sync_finish(Sec) -> {atomic, Status} = emqx_cluster_rpc:status(), case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of @@ -676,7 +679,7 @@ waiting_for_sync_finish(Sec) -> waiting_for_sync_finish(Sec + 1) end. -find_inconsistent(Status, AllConfs) -> +find_lagging(Status, AllConfs) -> case find_highest_node(Status) of {same_tnx_id, TnxId} -> %% check the conf is the same or not @@ -684,7 +687,7 @@ find_inconsistent(Status, AllConfs) -> case find_inconsistent_key(TargetConf, OtherConfs) of [] -> Msg = - <<"All configuration has already been synchronized(", + <<"All configuration synchronized(tnx_id=", (integer_to_binary(TnxId))/binary, ") successfully\n">>, {consistent, Msg}; InconsistentKeys -> @@ -789,10 +792,13 @@ print_inconsistent_conf(New, Old, Options) -> target := {Target, TargetTnxId}, node := {Node, NodeTnxId} } = Options, - emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [ + emqx_ctl:print("~ts(tnx_id=~w)'s ~s is diff from ~ts(tnx_id=~w).~n", [ Node, NodeTnxId, Key, Target, TargetTnxId ]), - print_hocon(#{Node => Old, Target => New}). + emqx_ctl:print("~ts:~n", [Node]), + print_hocon(Old), + emqx_ctl:print("~ts:~n", [Target]), + print_hocon(New). print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> #{ @@ -864,7 +870,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} ], ?assertEqual( - {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0) + {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) ), %% conf is the same, no changed @@ -873,7 +879,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} ], - ?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)), + ?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)), %% same tnx_id SameStatus = [ @@ -883,13 +889,13 @@ find_inconsistent_test() -> ], %% same conf ?assertEqual( - {consistent, <<"All configuration has already been synchronized(3) successfully\n">>}, - find_inconsistent(SameStatus, NoDiffConfs) + {consistent, <<"All configuration synchronized(tnx_id=3) successfully\n">>}, + find_lagging(SameStatus, NoDiffConfs) ), %% diff conf same tnx_id use the first one ?assertEqual( {inconsistent_key, 3, [<<"mqtt">>]}, - find_inconsistent(SameStatus, Confs0) + find_lagging(SameStatus, Confs0) ), ok. From f490a0cba20cb56aacc8aa1693d1bc36518c5554 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 5 Jul 2024 09:00:32 +0800 Subject: [PATCH 7/9] feat: don't reset tnx_id when cluster_fix --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 44 +++++++++++++++- apps/emqx_conf/src/emqx_conf_cli.erl | 69 ++++++++++++++----------- 2 files changed, 80 insertions(+), 33 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index eb605f977..283e17ed7 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -34,7 +34,8 @@ on_mria_stop/1, force_leave_clean/1, wait_for_cluster_rpc/0, - maybe_init_tnx_id/2 + maybe_init_tnx_id/2, + update_mfa/3 ]). -export([ commit/2, @@ -42,6 +43,7 @@ get_cluster_tnx_id/0, get_node_tnx_id/1, init_mfa/2, + update_mfa_in_trans/3, latest_tnx_id/0, make_initiate_call_req/3, read_next_mfa/1, @@ -546,6 +548,36 @@ init_mfa(Node, MFA) -> {retry, Meta} end. +update_mfa_in_trans(Node, MFA, NodeTnxId) -> + mnesia:write_lock_table(?CLUSTER_MFA), + case get_node_tnx_id(Node) of + NodeTnxId -> + TnxId = NodeTnxId + 1, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + lists:foreach( + fun(N) -> + ok = emqx_cluster_rpc:commit(N, NodeTnxId) + end, + mria:running_nodes() + ); + NewTnxId -> + Fmt = "someone_has_already_updated,tnx_id(~w) is not the latest(~w)", + Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]), + mnesia:abort({error, Reason}) + end. + +update_mfa(Node, MFA, LatestId) -> + case transaction(fun ?MODULE:update_mfa_in_trans/3, [Node, MFA, LatestId]) of + {atomic, ok} -> ok; + {aborted, Error} -> Error + end. + transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). @@ -574,13 +606,21 @@ trans_status() -> [], ?CLUSTER_COMMIT ), + Nodes = mria:running_nodes(), + IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))), lists:sort( fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) -> - {IdA, NA} > {IdB, NB} + {IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)} end, List ). +index_nodes(Node, IndexNodes) -> + case lists:keyfind(Node, 1, IndexNodes) of + false -> 0; + {_, Index} -> Index + end. + trans_query(TnxId) -> case mnesia:read(?CLUSTER_MFA, TnxId) of [] -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 76ac83f16..d332d31d7 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -25,7 +25,8 @@ admins/1, conf/1, audit/3, - unload/0 + unload/0, + mark_fix_log/1 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). @@ -104,15 +105,8 @@ admins(["fix"]) -> StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> - Core = - case find_highest_node(Status) of - {same_tnx_id, _TnxId} -> - {ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD), - Node; - {ok, Node} -> - Node - end, - emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role]) + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role]) end; admins(["fast_forward"]) -> status(), @@ -135,7 +129,7 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). -fix_lagging_with_raw(Node, Keys) -> +fix_lagging_with_raw(ToTnxId, Node, Keys) -> Confs = lists:foldl( fun(Key, Acc) -> KeyRaw = atom_to_binary(Key), @@ -144,12 +138,25 @@ fix_lagging_with_raw(Node, Keys) -> #{}, Keys ), - ok = emqx_cluster_rpc:reset(), - case load_config_from_raw(Confs, #{mode => replace}) of - ok -> waiting_for_fix_finish(); - Error -> Error + case mark_fix_begin(Node, ToTnxId) of + ok -> + case load_config_from_raw(Confs, #{mode => replace}) of + ok -> waiting_for_fix_finish(); + Error0 -> Error0 + end; + {error, Reason} -> + emqx_ctl:warning("mark fix begin failed: ~s~n", [Reason]) end. +mark_fix_begin(Node, TnxId) -> + {atomic, Status} = emqx_cluster_rpc:status(), + MFA = {?MODULE, mark_fix_log, [Status]}, + emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). + +mark_fix_log(Status) -> + ?SLOG(warning, #{msg => cluster_fix_log, status => Status}), + ok. + audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). @@ -213,25 +220,25 @@ maybe_fix_lagging(Status, #{fix := Fix}) -> %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), case find_lagging(Status, AllConfs) of - {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> - _ = fix_lagging_with_raw(Target, InconsistentKeys), + {inconsistent_tnx_id_key, ToTnxId, Target, InconsistentKeys} when Fix -> + _ = fix_lagging_with_raw(ToTnxId, Target, InconsistentKeys), ok; - {inconsistent_tnx_id_key, Target, InconsistentKeys} -> + {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); - inconsistent_tnx_id when Fix -> + {inconsistent_tnx_id, Target, ToTnxId} when Fix -> print_tnx_id_status(Status), - ok = emqx_cluster_rpc:reset(), - emqx_ctl:print("Reset tnxid to 0 successfully~n"); - inconsistent_tnx_id -> + ok = mark_fix_begin(Target, ToTnxId), + emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); + {inconsistent_tnx_id, _Target, _ToTnxId} -> print_tnx_id_status(Status), Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print(?SUGGESTION(Leader)); - {inconsistent_key, TnxId, InconsistentKeys} -> + {inconsistent_key, ToTnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [ - TnxId + ToTnxId ]), emqx_ctl:warning( "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n", @@ -693,11 +700,11 @@ find_lagging(Status, AllConfs) -> InconsistentKeys -> {inconsistent_key, TnxId, InconsistentKeys} end; - {ok, Target} -> + {ok, TargetId, Target} -> {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), case find_inconsistent_key(TargetConf, OtherConfs) of - [] -> inconsistent_tnx_id; - ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys} + [] -> {inconsistent_tnx_id, TargetId, Target}; + ChangedKeys -> {inconsistent_tnx_id_key, TargetId, Target, ChangedKeys} end end. @@ -722,8 +729,8 @@ find_highest_node(Status) -> case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of [{TnxId, _}] -> {same_tnx_id, TnxId}; - [{_TnxId, Target} | _] -> - {ok, Target} + [{TnxId, Target} | _] -> + {ok, TnxId, Target} end. changed(K, V, Conf) -> @@ -870,7 +877,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} ], ?assertEqual( - {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) + {inconsistent_tnx_id_key, 3, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) ), %% conf is the same, no changed @@ -879,7 +886,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} ], - ?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)), + ?assertEqual({inconsistent_tnx_id, 3, node2}, find_lagging(Status, NoDiffConfs)), %% same tnx_id SameStatus = [ From 457ea93570aa3dab1cfa4770b77dd50b256f5998 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 5 Jul 2024 14:10:04 +0800 Subject: [PATCH 8/9] test: add cluster_sync cli test --- apps/emqx_conf/src/emqx_conf_cli.erl | 22 ++- .../test/emqx_conf_cluster_sync_SUITE.erl | 158 ++++++++++++++++++ 2 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index d332d31d7..864d8e08a 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -26,7 +26,7 @@ conf/1, audit/3, unload/0, - mark_fix_log/1 + mark_fix_log/2 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). @@ -103,7 +103,8 @@ admins(["fix"]) -> #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), maybe_fix_lagging(Status, #{fix => true}), StoppedNodes =/= [] andalso - emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); + emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), + ok; Role -> Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role]) @@ -153,8 +154,8 @@ mark_fix_begin(Node, TnxId) -> MFA = {?MODULE, mark_fix_log, [Status]}, emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). -mark_fix_log(Status) -> - ?SLOG(warning, #{msg => cluster_fix_log, status => Status}), +mark_fix_log(Status, Opts) -> + ?SLOG(warning, #{msg => cluster_fix_log, status => Status, opts => Opts}), ok. audit(Level, From, Log) -> @@ -226,11 +227,16 @@ maybe_fix_lagging(Status, #{fix := Fix}) -> {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); - {inconsistent_tnx_id, Target, ToTnxId} when Fix -> + {inconsistent_tnx_id, ToTnxId, Target} when Fix -> print_tnx_id_status(Status), - ok = mark_fix_begin(Target, ToTnxId), - emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); - {inconsistent_tnx_id, _Target, _ToTnxId} -> + case mark_fix_begin(Target, ToTnxId) of + ok -> + waiting_for_fix_finish(), + emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); + Error -> + Error + end; + {inconsistent_tnx_id, _ToTnxId, _Target} -> print_tnx_id_status(Status), Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print(?SUGGESTION(Leader)); diff --git a/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl new file mode 100644 index 000000000..093fe591f --- /dev/null +++ b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_cluster_sync_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_conf.hrl"). + +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = ?config(priv_dir, Config), + Cluster = mk_cluster_spec(#{}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), + [{cluster_nodes, Nodes} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)). + +t_fix(Config) -> + [Node1, Node2] = ?config(cluster_nodes, Config), + ?ON(Node1, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node2, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 100}, #{})), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + ?ON( + Node1, + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + ), + %% fix normal, nothing changed + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + %% fix inconsistent_key. tnx_id is the same, so nothing changed. + emqx_conf_proto_v4:update(Node1, [<<"mqtt">>], #{<<"max_topic_levels">> => 99}, #{}), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id_key. tnx_id and key are updated. + ?ON(Node1, fake_mfa(2, Node1, {?MODULE, undef, []})), + %% 2 -> fake_mfa, 3-> mark_begin_log, 4-> mqtt 5 -> zones + ?ON(Node2, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 5}, + #{node := Node1, tnx_id := 5} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id. tnx_id is updated. + {ok, _} = ?ON( + Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 98}, #{}) + ), + ?ON(Node2, fake_mfa(7, Node2, {?MODULE, undef1, []})), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + %% unchanged + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ok. + +fake_mfa(TnxId, Node, MFA) -> + Func = fun() -> + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = emqx_cluster_rpc:commit(Node, TnxId) + end, + {atomic, ok} = mria:transaction(?CLUSTER_RPC_SHARD, Func, []), + ok. + +mk_cluster_spec(Opts) -> + Conf = #{ + listeners => #{ + tcp => #{default => <<"marked_for_deletion">>}, + ssl => #{default => <<"marked_for_deletion">>}, + ws => #{default => <<"marked_for_deletion">>}, + wss => #{default => <<"marked_for_deletion">>} + } + }, + Apps = [ + {emqx, #{config => Conf}}, + {emqx_conf, #{config => Conf}} + ], + [ + {emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Apps}}, + {emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Apps}} + ]. From 820789a09f5e6c4832f90e05eb307520e3af28c2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 8 Jul 2024 17:32:45 +0800 Subject: [PATCH 9/9] fix: redact status when mark_fix_log begin --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 8 ++++---- apps/emqx_conf/src/emqx_conf_cli.erl | 6 +++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 283e17ed7..7f7a44faf 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -43,7 +43,7 @@ get_cluster_tnx_id/0, get_node_tnx_id/1, init_mfa/2, - update_mfa_in_trans/3, + force_sync_tnx_id/3, latest_tnx_id/0, make_initiate_call_req/3, read_next_mfa/1, @@ -548,7 +548,7 @@ init_mfa(Node, MFA) -> {retry, Meta} end. -update_mfa_in_trans(Node, MFA, NodeTnxId) -> +force_sync_tnx_id(Node, MFA, NodeTnxId) -> mnesia:write_lock_table(?CLUSTER_MFA), case get_node_tnx_id(Node) of NodeTnxId -> @@ -567,13 +567,13 @@ update_mfa_in_trans(Node, MFA, NodeTnxId) -> mria:running_nodes() ); NewTnxId -> - Fmt = "someone_has_already_updated,tnx_id(~w) is not the latest(~w)", + Fmt = "aborted_force_sync, tnx_id(~w) is not the latest(~w)", Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]), mnesia:abort({error, Reason}) end. update_mfa(Node, MFA, LatestId) -> - case transaction(fun ?MODULE:update_mfa_in_trans/3, [Node, MFA, LatestId]) of + case transaction(fun ?MODULE:force_sync_tnx_id/3, [Node, MFA, LatestId]) of {atomic, ok} -> ok; {aborted, Error} -> Error end. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 864d8e08a..e730f23c0 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -155,7 +155,11 @@ mark_fix_begin(Node, TnxId) -> emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). mark_fix_log(Status, Opts) -> - ?SLOG(warning, #{msg => cluster_fix_log, status => Status, opts => Opts}), + ?SLOG(warning, #{ + msg => cluster_config_sync_triggered, + status => emqx_utils:redact(Status), + opts => Opts + }), ok. audit(Level, From, Log) ->