diff --git a/.github/workflows/run_emqx_app_tests.yaml b/.github/workflows/run_emqx_app_tests.yaml index ffed8d0c8..24c3d2b42 100644 --- a/.github/workflows/run_emqx_app_tests.yaml +++ b/.github/workflows/run_emqx_app_tests.yaml @@ -71,7 +71,7 @@ jobs: ./rebar3 xref ./rebar3 dialyzer ./rebar3 eunit -v - ./rebar3 ct -v + ./rebar3 ct -v --readable=true ./rebar3 proper -d test/props - uses: actions/upload-artifact@v3 if: failure() diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index d0c26ceb5..6e03971a5 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -93,7 +93,6 @@ end_per_group(_Group, _Config) -> emqx_common_test_helpers:stop_apps([]). init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), Config. end_per_suite(_Config) -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index e77bef90c..ce02f16c3 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -885,14 +885,24 @@ t_handle_kicked_publish_will_msg(_) -> Self = self(), ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), - Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + ClientId = test, + WillTopic = <<"will_topic">>, + WillPayload = <<"will_payload">>, + Msg = emqx_message:make(ClientId, WillTopic, WillPayload), {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( kick, channel(#{will_msg => Msg}) ), receive - {pub, Msg} -> ok - after 10_000 -> exit(will_message_not_published) + {pub, RecMsg} -> + ?assertEqual(ClientId, RecMsg#message.from, #{msg => Msg}), + ?assertEqual(WillTopic, RecMsg#message.topic, #{msg => Msg}), + ?assertEqual(WillPayload, RecMsg#message.payload, #{msg => Msg}), + ok + after 5_000 -> + ct:pal("expected message: ~p", [Msg]), + ct:pal("~p mailbox: ~p", [?LINE, process_info(self(), messages)]), + exit(will_message_not_published) end. t_handle_call_discard(_) -> diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index f85ed2599..7aaf93c99 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -756,8 +756,8 @@ start_slave(Name, Opts) when is_map(Opts) -> throw(Other) end, pong = net_adm:ping(Node), - setup_node(Node, Opts), ok = snabbkaffe:forward_trace(Node), + setup_node(Node, Opts), Node. %% Node stopping diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index daae15a17..e59259c3e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -17,6 +17,8 @@ -behaviour(application). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([start/2, stop/1]). -export([ @@ -34,6 +36,7 @@ start(_StartType, _StartArgs) -> ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge), + ?tp(emqx_bridge_app_started, #{}), {ok, Sup}. stop(_State) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 814051733..65b88d45b 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -1062,10 +1062,15 @@ do_econnrefused_or_timeout_test(Config, Error) -> fun(Trace) -> case Error of econnrefused -> - ?assertMatch( - [#{reason := Error, connector := ResourceId} | _], - ?of_kind(gcp_pubsub_request_failed, Trace) - ); + case ?of_kind(gcp_pubsub_request_failed, Trace) of + [#{reason := Error, connector := ResourceId} | _] -> + ok; + [#{reason := {closed, _Msg}, connector := ResourceId} | _] -> + %% _Msg = "The connection was lost." + ok; + Trace0 -> + error({unexpected_trace, Trace0}) + end; timeout -> ?assertMatch( [_, _ | _], diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 7e7acbcd5..33c207c39 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2078,7 +2078,7 @@ t_resource_manager_crash_after_subscriber_started(Config) -> 10_000 ), case Res of - {error, {config_update_crashed, {killed, _}}} -> + {error, {config_update_crashed, _}} -> ok; {ok, _} -> %% the new manager may have had time to startup @@ -2135,7 +2135,7 @@ t_resource_manager_crash_before_subscriber_started(Config) -> 10_000 ), case Res of - {error, {config_update_crashed, {killed, _}}} -> + {error, {config_update_crashed, _}} -> ok; {ok, _} -> %% the new manager may have had time to startup diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index f280c51d6..721beab6e 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -203,8 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) -> " pool_size = 1\n" " sql = \"~s\"\n" " resource_opts = {\n" - " auto_restart_interval = 5000\n" - " request_timeout = 30000\n" + " auto_restart_interval = \"5s\"\n" + " health_check_interval = \"5s\"\n" + " request_timeout = \"30s\"\n" " query_mode = \"async\"\n" " enable_batch = true\n" " batch_size = 3\n" @@ -233,6 +234,11 @@ resource_id(Config) -> Name = ?config(oracle_name, Config), emqx_bridge_resource:resource_id(Type, Name). +bridge_id(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + emqx_bridge_resource:bridge_id(Type, Name). + create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). @@ -361,6 +367,7 @@ t_batch_sync_query(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), ResourceId = resource_id(Config), + BridgeId = bridge_id(Config), ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), @@ -380,12 +387,12 @@ t_batch_sync_query(Config) -> % Send 3 async messages while resource is down. When it comes back, these messages % will be delivered in sync way. If we try to send sync messages directly, it will % be sent async as callback_mode is set to async_if_possible. - Message = {send_message, Params}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(1000), - emqx_resource:query(ResourceId, Message), - emqx_resource:query(ResourceId, Message), - emqx_resource:query(ResourceId, Message) + emqx_bridge:send_message(BridgeId, Params), + emqx_bridge:send_message(BridgeId, Params), + emqx_bridge:send_message(BridgeId, Params), + ok end), ?retry( _Sleep = 1_000, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index a5c04160c..9dc2f05d6 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -1064,9 +1064,17 @@ t_cluster(Config) -> ?check_trace( begin Nodes = [N1, N2 | _] = start_cluster(Cluster), + %% wait until bridge app supervisor is up; by that point, + %% `emqx_config_handler:add_handler' has been called and the node should be + %% ready to create bridges. + NumNodes = length(Nodes), + {ok, _} = snabbkaffe:block_until( + ?match_n_events(NumNodes, #{?snk_kind := emqx_bridge_app_started}), + 15_000 + ), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := pulsar_producer_bridge_started}), - length(Nodes), + NumNodes, 15_000 ), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index e497e0a47..707aa47ea 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -102,7 +102,6 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), Config. end_per_suite(_Config) ->