From 457ea93570aa3dab1cfa4770b77dd50b256f5998 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 5 Jul 2024 14:10:04 +0800 Subject: [PATCH] test: add cluster_sync cli test --- apps/emqx_conf/src/emqx_conf_cli.erl | 22 ++- .../test/emqx_conf_cluster_sync_SUITE.erl | 158 ++++++++++++++++++ 2 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index d332d31d7..864d8e08a 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -26,7 +26,7 @@ conf/1, audit/3, unload/0, - mark_fix_log/1 + mark_fix_log/2 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). @@ -103,7 +103,8 @@ admins(["fix"]) -> #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), maybe_fix_lagging(Status, #{fix => true}), StoppedNodes =/= [] andalso - emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); + emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), + ok; Role -> Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role]) @@ -153,8 +154,8 @@ mark_fix_begin(Node, TnxId) -> MFA = {?MODULE, mark_fix_log, [Status]}, emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). -mark_fix_log(Status) -> - ?SLOG(warning, #{msg => cluster_fix_log, status => Status}), +mark_fix_log(Status, Opts) -> + ?SLOG(warning, #{msg => cluster_fix_log, status => Status, opts => Opts}), ok. audit(Level, From, Log) -> @@ -226,11 +227,16 @@ maybe_fix_lagging(Status, #{fix := Fix}) -> {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); - {inconsistent_tnx_id, Target, ToTnxId} when Fix -> + {inconsistent_tnx_id, ToTnxId, Target} when Fix -> print_tnx_id_status(Status), - ok = mark_fix_begin(Target, ToTnxId), - emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); - {inconsistent_tnx_id, _Target, _ToTnxId} -> + case mark_fix_begin(Target, ToTnxId) of + ok -> + waiting_for_fix_finish(), + emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); + Error -> + Error + end; + {inconsistent_tnx_id, _ToTnxId, _Target} -> print_tnx_id_status(Status), Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print(?SUGGESTION(Leader)); diff --git a/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl new file mode 100644 index 000000000..093fe591f --- /dev/null +++ b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_cluster_sync_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_conf.hrl"). + +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = ?config(priv_dir, Config), + Cluster = mk_cluster_spec(#{}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), + [{cluster_nodes, Nodes} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)). + +t_fix(Config) -> + [Node1, Node2] = ?config(cluster_nodes, Config), + ?ON(Node1, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node2, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 100}, #{})), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + ?ON( + Node1, + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + ), + %% fix normal, nothing changed + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + %% fix inconsistent_key. tnx_id is the same, so nothing changed. + emqx_conf_proto_v4:update(Node1, [<<"mqtt">>], #{<<"max_topic_levels">> => 99}, #{}), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id_key. tnx_id and key are updated. + ?ON(Node1, fake_mfa(2, Node1, {?MODULE, undef, []})), + %% 2 -> fake_mfa, 3-> mark_begin_log, 4-> mqtt 5 -> zones + ?ON(Node2, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 5}, + #{node := Node1, tnx_id := 5} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id. tnx_id is updated. + {ok, _} = ?ON( + Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 98}, #{}) + ), + ?ON(Node2, fake_mfa(7, Node2, {?MODULE, undef1, []})), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + %% unchanged + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ok. + +fake_mfa(TnxId, Node, MFA) -> + Func = fun() -> + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = emqx_cluster_rpc:commit(Node, TnxId) + end, + {atomic, ok} = mria:transaction(?CLUSTER_RPC_SHARD, Func, []), + ok. + +mk_cluster_spec(Opts) -> + Conf = #{ + listeners => #{ + tcp => #{default => <<"marked_for_deletion">>}, + ssl => #{default => <<"marked_for_deletion">>}, + ws => #{default => <<"marked_for_deletion">>}, + wss => #{default => <<"marked_for_deletion">>} + } + }, + Apps = [ + {emqx, #{config => Conf}}, + {emqx_conf, #{config => Conf}} + ], + [ + {emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Apps}}, + {emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Apps}} + ].