diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 6513516d4..029907f57 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -50,7 +50,7 @@ -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). -define(TIMEOUT_NODE_START_MS, 15000). --define(TIMEOUT_APPS_START_MS, 60000). +-define(TIMEOUT_APPS_START_MS, 30000). -define(TIMEOUT_NODE_STOP_S, 15). %% diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 7b5208f06..e88206ccd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -73,16 +73,15 @@ -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). -define(APPSPECS, [ - emqx_conf, emqx, + emqx_conf, emqx_auth, emqx_auth_mnesia, emqx_management, emqx_connector, emqx_bridge_http, - emqx_bridge, - {emqx_rule_engine, "rule_engine { rules {} }"}, - {emqx_bridge, "bridges {}"} + {emqx_bridge, "actions {}\n bridges {}"}, + {emqx_rule_engine, "rule_engine { rules {} }"} ]). -define(APPSPEC_DASHBOARD, @@ -120,10 +119,10 @@ end_per_suite(_Config) -> ok. init_per_group(cluster = Name, Config) -> - Nodes = [NodePrimary | _] = mk_cluster(Config), + Nodes = [NodePrimary | _] = mk_cluster(Name, Config), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(cluster_later_join = Name, Config) -> - Nodes = [NodePrimary | _] = mk_cluster(Config, #{join_to => undefined}), + Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(_Name, Config) -> WorkDir = emqx_cth_suite:work_dir(Config), @@ -135,10 +134,10 @@ init_api(Config) -> {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), [{api, App} | Config]. -mk_cluster(Config) -> - mk_cluster(Config, #{}). +mk_cluster(Name, Config) -> + mk_cluster(Name, Config, #{}). -mk_cluster(Config, Opts) -> +mk_cluster(Name, Config, Opts) -> Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], Node2Apps = ?APPSPECS, emqx_cth_cluster:start( @@ -146,7 +145,7 @@ mk_cluster(Config, Opts) -> {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}}, {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(Config)} + #{work_dir => emqx_cth_suite:work_dir(Name, Config)} ). end_per_group(Group, Config) when @@ -162,7 +161,7 @@ init_per_testcase(t_broken_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 1, -1), meck:expect(emqx_bpapi, supported_version, 2, -1), - init_per_testcase(commong, Config); + init_per_testcase(common, Config); init_per_testcase(t_old_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 1, 1), @@ -188,6 +187,18 @@ end_per_testcase(_, Config) -> ok. clear_resources() -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + ok = emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + lists:foreach( + fun(#{type := Type, name := Name}) -> + ok = emqx_connector:remove(Type, Name) + end, + emqx_connector:list() + ), lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_bridge:remove(Type, Name) @@ -1314,6 +1325,7 @@ t_cluster_later_join_metrics(Config) -> BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ?check_trace( + #{timetrap => 15_000}, begin %% Create a bridge on only one of the nodes. ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)), @@ -1325,24 +1337,28 @@ t_cluster_later_join_metrics(Config) -> }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), + + ct:print("node joining cluster"), %% Now join the other node join with the api node. ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), - %% Check metrics; shouldn't crash even if the bridge is not - %% ready on the node that just joined the cluster. + %% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the + %% applications, in particular `emqx_conf' doesn't restart and synchronize the + %% transaction id. It's also unclear at the moment why the equivalent test in + %% `emqx_bridge_v2_api_SUITE' doesn't need this hack. + ok = erpc:call(OtherNode, application, stop, [emqx_conf]), + ok = erpc:call(OtherNode, application, start, [emqx_conf]), + ct:print("node joined cluster"), %% assert: wait for the bridge to be ready on the other node. - fun - WaitConfSync(0) -> - throw(waiting_config_sync_timeout); - WaitConfSync(N) -> - timer:sleep(1000), - case erpc:call(OtherNode, emqx_bridge, list, []) of - [] -> WaitConfSync(N - 1); - [_] -> ok - end - end( - 60 - ), + {_, {ok, _}} = + ?wait_async_action( + {emqx_cluster_rpc, OtherNode} ! wake_up, + #{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}}, + 10_000 + ), + + %% Check metrics; shouldn't crash even if the bridge is not + %% ready on the node that just joined the cluster. ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 8758c325d..83a857b47 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -185,7 +185,7 @@ mk_cluster(Name, Config, Opts) -> {emqx_bridge_v2_api_SUITE_1, Opts#{role => core, apps => Node1Apps}}, {emqx_bridge_v2_api_SUITE_2, Opts#{role => core, apps => Node2Apps}} ], - #{work_dir => filename:join(?config(priv_dir, Config), Name)} + #{work_dir => emqx_cth_suite:work_dir(Name, Config)} ). end_per_group(Group, Config) when diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 5bc330afa..756a5ec30 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -66,6 +66,7 @@ -boot_mnesia({mnesia, [boot]}). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_conf.hrl"). -ifdef(TEST). @@ -384,6 +385,7 @@ catch_up(State) -> catch_up(State, false). catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) -> case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of {atomic, caught_up} -> + ?tp(cluster_rpc_caught_up, #{}), ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),