From 22fc3c49cc30d6a470c4f7d4494e784fafbe6a74 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 11 Jun 2024 22:19:54 +0800 Subject: [PATCH] chore: combine some common code into one function --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 2 +- apps/emqx_conf/src/emqx_conf_cli.erl | 87 +++++++++---------- .../src/proto/emqx_conf_proto_v3.erl | 4 + 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index a2c0a2478..6c90cf829 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -503,7 +503,7 @@ stale_view_of_cluster_msg(Meta, Count) -> Reason = Meta#{ msg => stale_view_of_cluster, retry_times => Count, - suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" + suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time" }, ?SLOG(warning, Reason), {error, Reason}. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 09bf3ca3d..e873804de 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -41,7 +41,6 @@ -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). -define(TIMEOUT, 30000). - -dialyzer({no_match, [load/0]}). load() -> @@ -97,32 +96,16 @@ admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); admins(["fix"]) -> - {atomic, Status} = emqx_cluster_rpc:status(), - %% find inconsistent in conf, but fix in raw way. - %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) - #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), - AllConfs = find_running_confs(), - case find_inconsistent(Status, AllConfs) of - {inconsistent_tnx_id_key, Target, InconsistentKeys} -> - _ = fix_inconsistent_with_raw(Target, InconsistentKeys), - ok; - inconsistent_tnx_id -> - print_tnx_id_status(Status), - ok = emqx_cluster_rpc:reset(), - emqx_ctl:print("Reset tnxid to 0 successfully~n"); - {inconsistent_key, TnxId, InconsistentKeys} -> - [{Target, _} | _] = AllConfs, - print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), - emqx_ctl:warning( - "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", - [InconsistentKeys] - ), - emqx_ctl:warning("This is normal. This fix will not make any changes.~n"); - {error, Reason} -> - emqx_ctl:print(Reason) - end, - StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]); + case mria_rlog:role() of + core -> + {atomic, Status} = emqx_cluster_rpc:status(), + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_inconsistent(Status, #{fix => true}), + StoppedNodes =/= [] andalso + emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); + Role -> + emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role]) + end; admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -206,33 +189,47 @@ status() -> status(Status) -> emqx_ctl:print("-----------------------------------------------\n"), #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_inconsistent(Status, #{fix => false}), + StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), + emqx_ctl:print("-----------------------------------------------\n"). + +maybe_fix_inconsistent(Status, #{fix := Fix}) -> + %% find inconsistent in conf, but fix in raw way. + %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), case find_inconsistent(Status, AllConfs) of - {inconsistent_tnx_id_key, TargetNode, InconsistentKeys} -> + {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> + _ = fix_inconsistent_with_raw(Target, InconsistentKeys), + ok; + {inconsistent_tnx_id_key, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), - print_inconsistent_conf(InconsistentKeys, TargetNode, Status, AllConfs); + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); + inconsistent_tnx_id when Fix -> + print_tnx_id_status(Status), + ok = emqx_cluster_rpc:reset(), + emqx_ctl:print("Reset tnxid to 0 successfully~n"); inconsistent_tnx_id -> print_tnx_id_status(Status), - emqx_ctl:print( - "run `./bin/emqx_ctl conf cluster_sync fix` when not restored for a long time" - ); + emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time"); {inconsistent_key, TnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), - emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [TnxId]), + emqx_ctl:warning("All configuration has already been synchronized(~w)~n", [ + TnxId + ]), emqx_ctl:warning( - "but we find inconsistent keys: ~p, which come from environment variables or etc/emqx.conf.~n", + "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n", [InconsistentKeys] ), emqx_ctl:warning( "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" - " is allowed but not recommended.~n" - ); - {error, Reason} -> - emqx_ctl:print(Reason) - end, - StoppedNodes =/= [] andalso emqx_ctl:warning("Find stopped nodes: ~p~n", [StoppedNodes]), - emqx_ctl:print("-----------------------------------------------\n"). + " is allowed but not recommended. " + ), + Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"), + ok; + {consistent, Msg} -> + emqx_ctl:print(Msg) + end. print_tnx_id_status(List0) -> emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), @@ -658,7 +655,7 @@ waiting_for_sync_finish(Sec) -> emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); _ -> Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), - emqx_ctl:warning("sync: ~n", [Res]), + emqx_ctl:warning("sync status: ~p~n", [Res]), timer:sleep(1000), waiting_for_sync_finish(Sec + 1) end. @@ -673,7 +670,7 @@ find_inconsistent(Status, AllConfs) -> Msg = <<"All configuration has already been synchronized(", (integer_to_binary(TnxId))/binary, ") successfully\n">>, - {error, Msg}; + {consistent, Msg}; InconsistentKeys -> {inconsistent_key, TnxId, InconsistentKeys} end; @@ -762,7 +759,7 @@ print_inconsistent_conf(New = #{}, Old = #{}, Options) -> removed := Removed, changed := Changed } = emqx_utils_maps:diff_maps(New, Old), - RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they still function on ~ts(~w).~n", + RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they are still present on ~ts(~w).~n", print_inconsistent(Removed, RemovedFmt, Options), AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", print_inconsistent(Added, AddedFmt, Options), @@ -870,7 +867,7 @@ find_inconsistent_test() -> ], %% same conf ?assertEqual( - {error, <<"All configuration has already been synchronized(3) successfully\n">>}, + {consistent, <<"All configuration has already been synchronized(3) successfully\n">>}, find_inconsistent(SameStatus, NoDiffConfs) ), %% diff conf same tnx_id use the first one diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index 14d578ec1..8e5421d57 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, sync_data_from_node/1, get_config/2, get_config/3, @@ -43,6 +44,9 @@ introduced_in() -> "5.1.1". +deprecated_since() -> + "5.7.1". + -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). sync_data_from_node(Node) -> rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).