diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 042bf8d3c..10ac2b929 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -35,6 +35,17 @@ tnx_id :: pos_integer() | '$1' }). +-define(SUGGESTION(Node), + lists:flatten( + io_lib:format( + "run `./bin/emqx_ctl conf cluster_sync fix`" + " on ~p(config leader) to force sync the configs," + "when this node is lagging for more than 3 minutes,", + [Node] + ) + ) +). + -define(READONLY_KEYS, [cluster, rpc, node]). -endif. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 6c90cf829..eb605f977 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -28,6 +28,7 @@ reset/0, status/0, is_initiator/1, + find_leader/0, skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, @@ -227,6 +228,17 @@ status() -> is_initiator(Opts) -> ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE). +find_leader() -> + {atomic, Status} = status(), + case Status of + [#{node := N} | _] -> + N; + [] -> + %% running nodes already sort. + [N | _] = emqx:running_nodes(), + N + end. + %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. on_leave_clean() -> on_leave_clean(node()). @@ -500,10 +512,11 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> end. stale_view_of_cluster_msg(Meta, Count) -> + Node = find_leader(), Reason = Meta#{ msg => stale_view_of_cluster, retry_times => Count, - suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time" + suggestion => ?SUGGESTION(Node) }, ?SLOG(warning, Reason), {error, Reason}. @@ -537,7 +550,7 @@ transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). trans_status() -> - mnesia:foldl( + List = mnesia:foldl( fun(Rec, Acc) -> #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, case mnesia:read(?CLUSTER_MFA, TnxId) of @@ -560,6 +573,12 @@ trans_status() -> end, [], ?CLUSTER_COMMIT + ), + lists:sort( + fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) -> + {IdA, NA} > {IdB, NB} + end, + List ). trans_query(TnxId) -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index e873804de..7d044a36b 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -96,15 +96,23 @@ admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); admins(["fix"]) -> + {atomic, Status} = emqx_cluster_rpc:status(), case mria_rlog:role() of core -> - {atomic, Status} = emqx_cluster_rpc:status(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), maybe_fix_inconsistent(Status, #{fix => true}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> - emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role]) + Core = + case find_highest_node(Status) of + {same_tnx_id, _TnxId} -> + {ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD), + Node; + {ok, Node} -> + Node + end, + emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role]) end; admins(["fast_forward"]) -> status(), @@ -128,7 +136,14 @@ admins(_) -> emqx_ctl:usage(usage_sync()). fix_inconsistent_with_raw(Node, Keys) -> - Confs = [#{Key => emqx_conf_proto_v4:get_raw_config(Node, Key)} || Key <- Keys], + Confs = lists:foldl( + fun(Key, Acc) -> + KeyRaw = atom_to_binary(Key), + Acc#{KeyRaw => emqx_conf_proto_v4:get_raw_config(Node, [Key])} + end, + #{}, + Keys + ), ok = emqx_cluster_rpc:reset(), case load_config_from_raw(Confs, #{mode => replace}) of ok -> waiting_for_fix_finish(); @@ -179,7 +194,7 @@ usage_sync() -> "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fix", "Sync the node with the most comprehensive configuration to other node.\n" - "WARNING: typically the one with the highest tnxid."} + "WARNING: typically the config leader(with the highest tnxid)."} ]. status() -> @@ -210,7 +225,8 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> emqx_ctl:print("Reset tnxid to 0 successfully~n"); inconsistent_tnx_id -> print_tnx_id_status(Status), - emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time"); + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print(?SUGGESTION(Leader)); {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), @@ -223,7 +239,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> ), emqx_ctl:warning( "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" - " is allowed but not recommended. " + " is allowed but not recommended.~n" ), Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"), ok; diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index c2a4ca404..5dd005c02 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -148,7 +148,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> retry_times := 2, cluster_tnx_id := 2, node_tnx_id := 1, - suggested := _ + suggestion := _ }}}, Res1 ), diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index b31140be8..f2c74968e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -314,15 +314,15 @@ global_zone_configs(get, _Params, _Req) -> {200, get_zones()}; global_zone_configs(put, #{body := Body}, _Req) -> PrevZones = get_zones(), - Res = + {Res, Error} = maps:fold( - fun(Path, Value, Acc) -> + fun(Path, Value, {Acc, Error}) -> PrevValue = maps:get(Path, PrevZones), case Value =/= PrevValue of true -> case emqx_conf:update([Path], Value, ?OPTS) of {ok, #{raw_config := RawConf}} -> - Acc#{Path => RawConf}; + {Acc#{Path => RawConf}, Error}; {error, Reason} -> ?SLOG(error, #{ msg => "update_global_zone_failed", @@ -330,18 +330,18 @@ global_zone_configs(put, #{body := Body}, _Req) -> path => Path, value => Value }), - Acc + {Acc, Error#{Path => Reason}} end; false -> - Acc#{Path => Value} + {Acc#{Path => Value}, Error} end end, - #{}, + {#{}, #{}}, Body ), case maps:size(Res) =:= maps:size(Body) of true -> {200, Res}; - false -> {400, #{code => 'UPDATE_FAILED'}} + false -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Error)}} end. config_reset(post, _Params, Req) ->