chore: combine some common code into one function

This commit is contained in:
zhongwencool 2024-06-11 22:19:54 +08:00
parent 5b105fcdbb
commit 22fc3c49cc
3 changed files with 47 additions and 46 deletions

View File

@ -503,7 +503,7 @@ stale_view_of_cluster_msg(Meta, Count) ->
Reason = Meta#{ Reason = Meta#{
msg => stale_view_of_cluster, msg => stale_view_of_cluster,
retry_times => Count, retry_times => Count,
suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time"
}, },
?SLOG(warning, Reason), ?SLOG(warning, Reason),
{error, Reason}. {error, Reason}.

View File

@ -41,7 +41,6 @@
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
-define(TIMEOUT, 30000). -define(TIMEOUT, 30000).
-dialyzer({no_match, [load/0]}). -dialyzer({no_match, [load/0]}).
load() -> load() ->
@ -97,32 +96,16 @@ admins(["inspect", TnxId0]) ->
TnxId = list_to_integer(TnxId0), TnxId = list_to_integer(TnxId0),
print(emqx_cluster_rpc:query(TnxId)); print(emqx_cluster_rpc:query(TnxId));
admins(["fix"]) -> admins(["fix"]) ->
case mria_rlog:role() of
core ->
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
%% find inconsistent in conf, but fix in raw way.
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
AllConfs = find_running_confs(), maybe_fix_inconsistent(Status, #{fix => true}),
case find_inconsistent(Status, AllConfs) of StoppedNodes =/= [] andalso
{inconsistent_tnx_id_key, Target, InconsistentKeys} -> emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]);
_ = fix_inconsistent_with_raw(Target, InconsistentKeys), Role ->
ok; emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role])
inconsistent_tnx_id -> end;
print_tnx_id_status(Status),
ok = emqx_cluster_rpc:reset(),
emqx_ctl:print("Reset tnxid to 0 successfully~n");
{inconsistent_key, TnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]),
emqx_ctl:warning(
"but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n",
[InconsistentKeys]
),
emqx_ctl:warning("This is normal. This fix will not make any changes.~n");
{error, Reason} ->
emqx_ctl:print(Reason)
end,
StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]);
admins(["fast_forward"]) -> admins(["fast_forward"]) ->
status(), status(),
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
@ -206,33 +189,47 @@ status() ->
status(Status) -> status(Status) ->
emqx_ctl:print("-----------------------------------------------\n"), emqx_ctl:print("-----------------------------------------------\n"),
#{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
maybe_fix_inconsistent(Status, #{fix => false}),
StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]),
emqx_ctl:print("-----------------------------------------------\n").
maybe_fix_inconsistent(Status, #{fix := Fix}) ->
%% find inconsistent in conf, but fix in raw way.
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
AllConfs = find_running_confs(), AllConfs = find_running_confs(),
case find_inconsistent(Status, AllConfs) of case find_inconsistent(Status, AllConfs) of
{inconsistent_tnx_id_key, TargetNode, InconsistentKeys} -> {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix ->
_ = fix_inconsistent_with_raw(Target, InconsistentKeys),
ok;
{inconsistent_tnx_id_key, Target, InconsistentKeys} ->
emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs); print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs);
inconsistent_tnx_id when Fix ->
print_tnx_id_status(Status),
ok = emqx_cluster_rpc:reset(),
emqx_ctl:print("Reset tnxid to 0 successfully~n");
inconsistent_tnx_id -> inconsistent_tnx_id ->
print_tnx_id_status(Status), print_tnx_id_status(Status),
emqx_ctl:print( emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time");
"run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time"
);
{inconsistent_key, TnxId, InconsistentKeys} -> {inconsistent_key, TnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs, [{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [
TnxId
]),
emqx_ctl:warning( emqx_ctl:warning(
"but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n",
[InconsistentKeys] [InconsistentKeys]
), ),
emqx_ctl:warning( emqx_ctl:warning(
"Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf"
" is allowed but not recommended.~n" " is allowed but not recommended. "
); ),
{error, Reason} -> Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"),
emqx_ctl:print(Reason) ok;
end, {consistent, Msg} ->
StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print(Msg)
emqx_ctl:print("-----------------------------------------------\n"). end.
print_tnx_id_status(List0) -> print_tnx_id_status(List0) ->
emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"),
@ -658,7 +655,7 @@ waiting_for_sync_finish(Sec) ->
emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]);
_ -> _ ->
Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]),
emqx_ctl:warning("sync: ~n", [Res]), emqx_ctl:warning("sync status: ~p~n", [Res]),
timer:sleep(1000), timer:sleep(1000),
waiting_for_sync_finish(Sec + 1) waiting_for_sync_finish(Sec + 1)
end. end.
@ -673,7 +670,7 @@ find_inconsistent(Status, AllConfs) ->
Msg = Msg =
<<"All configuration has already been synchronized(", <<"All configuration has already been synchronized(",
(integer_to_binary(TnxId))/binary, ") successfully\n">>, (integer_to_binary(TnxId))/binary, ") successfully\n">>,
{error, Msg}; {consistent, Msg};
InconsistentKeys -> InconsistentKeys ->
{inconsistent_key, TnxId, InconsistentKeys} {inconsistent_key, TnxId, InconsistentKeys}
end; end;
@ -762,7 +759,7 @@ print_inconsistent_conf(New = #{}, Old = #{}, Options) ->
removed := Removed, removed := Removed,
changed := Changed changed := Changed
} = emqx_utils_maps:diff_maps(New, Old), } = emqx_utils_maps:diff_maps(New, Old),
RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n", RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they are still present on ~ts(~w).~n",
print_inconsistent(Removed, RemovedFmt, Options), print_inconsistent(Removed, RemovedFmt, Options),
AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n",
print_inconsistent(Added, AddedFmt, Options), print_inconsistent(Added, AddedFmt, Options),
@ -870,7 +867,7 @@ find_inconsistent_test() ->
], ],
%% same conf %% same conf
?assertEqual( ?assertEqual(
{error, <<"All configuration has already been synchronized(3) successfully\n">>}, {consistent, <<"All configuration has already been synchronized(3) successfully\n">>},
find_inconsistent(SameStatus, NoDiffConfs) find_inconsistent(SameStatus, NoDiffConfs)
), ),
%% diff conf same tnx_id use the first one %% diff conf same tnx_id use the first one

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
sync_data_from_node/1, sync_data_from_node/1,
get_config/2, get_config/2,
get_config/3, get_config/3,
@ -43,6 +44,9 @@
introduced_in() -> introduced_in() ->
"5.1.1". "5.1.1".
deprecated_since() ->
"5.7.1".
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
sync_data_from_node(Node) -> sync_data_from_node(Node) ->
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).