From 298211d1016aeda916b441dcc0491f0a876f063b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 4 Jul 2024 15:20:11 +0800 Subject: [PATCH] chore: apply suggestions from code review Co-authored-by: zmstone --- apps/emqx_conf/include/emqx_conf.hrl | 4 +-- apps/emqx_conf/src/emqx_conf_cli.erl | 40 ++++++++++++++++------------ 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 10ac2b929..53f309856 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -39,8 +39,8 @@ lists:flatten( io_lib:format( "run `./bin/emqx_ctl conf cluster_sync fix`" - " on ~p(config leader) to force sync the configs," - "when this node is lagging for more than 3 minutes,", + " on ~p(config leader) to force sync the configs, " + "if this node has been lagging for more than 3 minutes.", [Node] ) ) diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 7d044a36b..76ac83f16 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -100,7 +100,7 @@ admins(["fix"]) -> case mria_rlog:role() of core -> #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - maybe_fix_inconsistent(Status, #{fix => true}), + maybe_fix_lagging(Status, #{fix => true}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> @@ -135,7 +135,7 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). -fix_inconsistent_with_raw(Node, Keys) -> +fix_lagging_with_raw(Node, Keys) -> Confs = lists:foldl( fun(Key, Acc) -> KeyRaw = atom_to_binary(Key), @@ -204,17 +204,17 @@ status() -> status(Status) -> emqx_ctl:print("-----------------------------------------------\n"), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - maybe_fix_inconsistent(Status, #{fix => false}), + maybe_fix_lagging(Status, #{fix => false}), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print("-----------------------------------------------\n"). -maybe_fix_inconsistent(Status, #{fix := Fix}) -> +maybe_fix_lagging(Status, #{fix := Fix}) -> %% find inconsistent in conf, but fix in raw way. %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), - case find_inconsistent(Status, AllConfs) of + case find_lagging(Status, AllConfs) of {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> - _ = fix_inconsistent_with_raw(Target, InconsistentKeys), + _ = fix_lagging_with_raw(Target, InconsistentKeys), ok; {inconsistent_tnx_id_key, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), @@ -230,7 +230,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) -> {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [ + emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [ TnxId ]), emqx_ctl:warning( @@ -663,7 +663,10 @@ waiting_for_fix_finish() -> waiting_for_sync_finish(1). waiting_for_sync_finish(10) -> - emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n"); + emqx_ctl:warning( + "Config is still not in sync after 10s.~n" + "It may need more time, and check the logs in the lagging nodes.~n" + ); waiting_for_sync_finish(Sec) -> {atomic, Status} = emqx_cluster_rpc:status(), case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of @@ -676,7 +679,7 @@ waiting_for_sync_finish(Sec) -> waiting_for_sync_finish(Sec + 1) end. -find_inconsistent(Status, AllConfs) -> +find_lagging(Status, AllConfs) -> case find_highest_node(Status) of {same_tnx_id, TnxId} -> %% check the conf is the same or not @@ -684,7 +687,7 @@ find_inconsistent(Status, AllConfs) -> case find_inconsistent_key(TargetConf, OtherConfs) of [] -> Msg = - <<"All configuration has already been synchronized(", + <<"All configuration synchronized(tnx_id=", (integer_to_binary(TnxId))/binary, ") successfully\n">>, {consistent, Msg}; InconsistentKeys -> @@ -789,10 +792,13 @@ print_inconsistent_conf(New, Old, Options) -> target := {Target, TargetTnxId}, node := {Node, NodeTnxId} } = Options, - emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [ + emqx_ctl:print("~ts(tnx_id=~w)'s ~s is diff from ~ts(tnx_id=~w).~n", [ Node, NodeTnxId, Key, Target, TargetTnxId ]), - print_hocon(#{Node => Old, Target => New}). + emqx_ctl:print("~ts:~n", [Node]), + print_hocon(Old), + emqx_ctl:print("~ts:~n", [Target]), + print_hocon(New). print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> #{ @@ -864,7 +870,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} ], ?assertEqual( - {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0) + {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) ), %% conf is the same, no changed @@ -873,7 +879,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} ], - ?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)), + ?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)), %% same tnx_id SameStatus = [ @@ -883,13 +889,13 @@ find_inconsistent_test() -> ], %% same conf ?assertEqual( - {consistent, <<"All configuration has already been synchronized(3) successfully\n">>}, - find_inconsistent(SameStatus, NoDiffConfs) + {consistent, <<"All configuration synchronized(tnx_id=3) successfully\n">>}, + find_lagging(SameStatus, NoDiffConfs) ), %% diff conf same tnx_id use the first one ?assertEqual( {inconsistent_key, 3, [<<"mqtt">>]}, - find_inconsistent(SameStatus, Confs0) + find_lagging(SameStatus, Confs0) ), ok.