chore: apply suggestions from code review

Co-authored-by: zmstone <zmstone@gmail.com>
This commit is contained in:
zhongwencool 2024-07-04 15:20:11 +08:00
parent bdf3fc63a6
commit 298211d101
2 changed files with 25 additions and 19 deletions

View File

@ -40,7 +40,7 @@
io_lib:format( io_lib:format(
"run `./bin/emqx_ctl conf cluster_sync fix`" "run `./bin/emqx_ctl conf cluster_sync fix`"
" on ~p(config leader) to force sync the configs, " " on ~p(config leader) to force sync the configs, "
"when this node is lagging for more than 3 minutes,", "if this node has been lagging for more than 3 minutes.",
[Node] [Node]
) )
) )

View File

@ -100,7 +100,7 @@ admins(["fix"]) ->
case mria_rlog:role() of case mria_rlog:role() of
core -> core ->
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
maybe_fix_inconsistent(Status, #{fix => true}), maybe_fix_lagging(Status, #{fix => true}),
StoppedNodes =/= [] andalso StoppedNodes =/= [] andalso
emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]);
Role -> Role ->
@ -135,7 +135,7 @@ admins(["fast_forward", Node0, ToTnxId]) ->
admins(_) -> admins(_) ->
emqx_ctl:usage(usage_sync()). emqx_ctl:usage(usage_sync()).
fix_inconsistent_with_raw(Node, Keys) -> fix_lagging_with_raw(Node, Keys) ->
Confs = lists:foldl( Confs = lists:foldl(
fun(Key, Acc) -> fun(Key, Acc) ->
KeyRaw = atom_to_binary(Key), KeyRaw = atom_to_binary(Key),
@ -204,17 +204,17 @@ status() ->
status(Status) -> status(Status) ->
emqx_ctl:print("-----------------------------------------------\n"), emqx_ctl:print("-----------------------------------------------\n"),
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
maybe_fix_inconsistent(Status, #{fix => false}), maybe_fix_lagging(Status, #{fix => false}),
StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]),
emqx_ctl:print("-----------------------------------------------\n"). emqx_ctl:print("-----------------------------------------------\n").
maybe_fix_inconsistent(Status, #{fix := Fix}) -> maybe_fix_lagging(Status, #{fix := Fix}) ->
%% find inconsistent in conf, but fix in raw way. %% find inconsistent in conf, but fix in raw way.
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
AllConfs = find_running_confs(), AllConfs = find_running_confs(),
case find_inconsistent(Status, AllConfs) of case find_lagging(Status, AllConfs) of
{inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix ->
_ = fix_inconsistent_with_raw(Target, InconsistentKeys), _ = fix_lagging_with_raw(Target, InconsistentKeys),
ok; ok;
{inconsistent_tnx_id_key, Target, InconsistentKeys} -> {inconsistent_tnx_id_key, Target, InconsistentKeys} ->
emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
@ -230,7 +230,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) ->
{inconsistent_key, TnxId, InconsistentKeys} -> {inconsistent_key, TnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs, [{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [ emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [
TnxId TnxId
]), ]),
emqx_ctl:warning( emqx_ctl:warning(
@ -663,7 +663,10 @@ waiting_for_fix_finish() ->
waiting_for_sync_finish(1). waiting_for_sync_finish(1).
waiting_for_sync_finish(10) -> waiting_for_sync_finish(10) ->
emqx_ctl:warning("waiting_for sync timeout(maybe failed) 10s ~n"); emqx_ctl:warning(
"Config is still not in sync after 10s.~n"
"It may need more time, and check the logs in the lagging nodes.~n"
);
waiting_for_sync_finish(Sec) -> waiting_for_sync_finish(Sec) ->
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of
@ -676,7 +679,7 @@ waiting_for_sync_finish(Sec) ->
waiting_for_sync_finish(Sec + 1) waiting_for_sync_finish(Sec + 1)
end. end.
find_inconsistent(Status, AllConfs) -> find_lagging(Status, AllConfs) ->
case find_highest_node(Status) of case find_highest_node(Status) of
{same_tnx_id, TnxId} -> {same_tnx_id, TnxId} ->
%% check the conf is the same or not %% check the conf is the same or not
@ -684,7 +687,7 @@ find_inconsistent(Status, AllConfs) ->
case find_inconsistent_key(TargetConf, OtherConfs) of case find_inconsistent_key(TargetConf, OtherConfs) of
[] -> [] ->
Msg = Msg =
<<"All configuration has already been synchronized(", <<"All configuration synchronized(tnx_id=",
(integer_to_binary(TnxId))/binary, ") successfully\n">>, (integer_to_binary(TnxId))/binary, ") successfully\n">>,
{consistent, Msg}; {consistent, Msg};
InconsistentKeys -> InconsistentKeys ->
@ -789,10 +792,13 @@ print_inconsistent_conf(New, Old, Options) ->
target := {Target, TargetTnxId}, target := {Target, TargetTnxId},
node := {Node, NodeTnxId} node := {Node, NodeTnxId}
} = Options, } = Options,
emqx_ctl:print("~ts(~w)'s ~s is diff from ~ts(~w).~n", [ emqx_ctl:print("~ts(tnx_id=~w)'s ~s is diff from ~ts(tnx_id=~w).~n", [
Node, NodeTnxId, Key, Target, TargetTnxId Node, NodeTnxId, Key, Target, TargetTnxId
]), ]),
print_hocon(#{Node => Old, Target => New}). emqx_ctl:print("~ts:~n", [Node]),
print_hocon(Old),
emqx_ctl:print("~ts:~n", [Target]),
print_hocon(New).
print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} ->
#{ #{
@ -864,7 +870,7 @@ find_inconsistent_test() ->
{node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}}
], ],
?assertEqual( ?assertEqual(
{inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_inconsistent(Status, Confs0) {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0)
), ),
%% conf is the same, no changed %% conf is the same, no changed
@ -873,7 +879,7 @@ find_inconsistent_test() ->
{node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}
], ],
?assertEqual(inconsistent_tnx_id, find_inconsistent(Status, NoDiffConfs)), ?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)),
%% same tnx_id %% same tnx_id
SameStatus = [ SameStatus = [
@ -883,13 +889,13 @@ find_inconsistent_test() ->
], ],
%% same conf %% same conf
?assertEqual( ?assertEqual(
{consistent, <<"All configuration has already been synchronized(3) successfully\n">>}, {consistent, <<"All configuration synchronized(tnx_id=3) successfully\n">>},
find_inconsistent(SameStatus, NoDiffConfs) find_lagging(SameStatus, NoDiffConfs)
), ),
%% diff conf same tnx_id use the first one %% diff conf same tnx_id use the first one
?assertEqual( ?assertEqual(
{inconsistent_key, 3, [<<"mqtt">>]}, {inconsistent_key, 3, [<<"mqtt">>]},
find_inconsistent(SameStatus, Confs0) find_lagging(SameStatus, Confs0)
), ),
ok. ok.