Merge pull request #10890 from thalesmg/test-flakiness-20230530-e

more attempts to fix more test flakiness
This commit is contained in:
Thales Macedo Garitezi 2023-05-31 16:07:18 -03:00 committed by GitHub
commit b7c72e6cfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 51 additions and 20 deletions

View File

@ -71,7 +71,7 @@ jobs:
./rebar3 xref ./rebar3 xref
./rebar3 dialyzer ./rebar3 dialyzer
./rebar3 eunit -v ./rebar3 eunit -v
./rebar3 ct -v ./rebar3 ct -v --readable=true
./rebar3 proper -d test/props ./rebar3 proper -d test/props
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
if: failure() if: failure()

View File

@ -93,7 +93,6 @@ end_per_group(_Group, _Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_common_test_helpers:stop_apps([]).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -885,14 +885,24 @@ t_handle_kicked_publish_will_msg(_) ->
Self = self(), Self = self(),
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), ClientId = test,
WillTopic = <<"will_topic">>,
WillPayload = <<"will_payload">>,
Msg = emqx_message:make(ClientId, WillTopic, WillPayload),
{shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call(
kick, channel(#{will_msg => Msg}) kick, channel(#{will_msg => Msg})
), ),
receive receive
{pub, Msg} -> ok {pub, RecMsg} ->
after 10_000 -> exit(will_message_not_published) ?assertEqual(ClientId, RecMsg#message.from, #{msg => Msg}),
?assertEqual(WillTopic, RecMsg#message.topic, #{msg => Msg}),
?assertEqual(WillPayload, RecMsg#message.payload, #{msg => Msg}),
ok
after 5_000 ->
ct:pal("expected message: ~p", [Msg]),
ct:pal("~p mailbox: ~p", [?LINE, process_info(self(), messages)]),
exit(will_message_not_published)
end. end.
t_handle_call_discard(_) -> t_handle_call_discard(_) ->

View File

@ -756,8 +756,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
throw(Other) throw(Other)
end, end,
pong = net_adm:ping(Node), pong = net_adm:ping(Node),
setup_node(Node, Opts),
ok = snabbkaffe:forward_trace(Node), ok = snabbkaffe:forward_trace(Node),
setup_node(Node, Opts),
Node. Node.
%% Node stopping %% Node stopping

View File

@ -17,6 +17,8 @@
-behaviour(application). -behaviour(application).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([start/2, stop/1]). -export([start/2, stop/1]).
-export([ -export([
@ -34,6 +36,7 @@ start(_StartType, _StartArgs) ->
ok = emqx_bridge:load_hook(), ok = emqx_bridge:load_hook(),
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge), ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
?tp(emqx_bridge_app_started, #{}),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -1062,10 +1062,15 @@ do_econnrefused_or_timeout_test(Config, Error) ->
fun(Trace) -> fun(Trace) ->
case Error of case Error of
econnrefused -> econnrefused ->
?assertMatch( case ?of_kind(gcp_pubsub_request_failed, Trace) of
[#{reason := Error, connector := ResourceId} | _], [#{reason := Error, connector := ResourceId} | _] ->
?of_kind(gcp_pubsub_request_failed, Trace) ok;
); [#{reason := {closed, _Msg}, connector := ResourceId} | _] ->
%% _Msg = "The connection was lost."
ok;
Trace0 ->
error({unexpected_trace, Trace0})
end;
timeout -> timeout ->
?assertMatch( ?assertMatch(
[_, _ | _], [_, _ | _],

View File

@ -2078,7 +2078,7 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
10_000 10_000
), ),
case Res of case Res of
{error, {config_update_crashed, {killed, _}}} -> {error, {config_update_crashed, _}} ->
ok; ok;
{ok, _} -> {ok, _} ->
%% the new manager may have had time to startup %% the new manager may have had time to startup
@ -2135,7 +2135,7 @@ t_resource_manager_crash_before_subscriber_started(Config) ->
10_000 10_000
), ),
case Res of case Res of
{error, {config_update_crashed, {killed, _}}} -> {error, {config_update_crashed, _}} ->
ok; ok;
{ok, _} -> {ok, _} ->
%% the new manager may have had time to startup %% the new manager may have had time to startup

View File

@ -203,8 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" sql = \"~s\"\n" " sql = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" auto_restart_interval = 5000\n" " auto_restart_interval = \"5s\"\n"
" request_timeout = 30000\n" " health_check_interval = \"5s\"\n"
" request_timeout = \"30s\"\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" enable_batch = true\n" " enable_batch = true\n"
" batch_size = 3\n" " batch_size = 3\n"
@ -233,6 +234,11 @@ resource_id(Config) ->
Name = ?config(oracle_name, Config), Name = ?config(oracle_name, Config),
emqx_bridge_resource:resource_id(Type, Name). emqx_bridge_resource:resource_id(Type, Name).
bridge_id(Config) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
emqx_bridge_resource:bridge_id(Type, Name).
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}). create_bridge(Config, _Overrides = #{}).
@ -361,6 +367,7 @@ t_batch_sync_query(Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
BridgeId = bridge_id(Config),
?check_trace( ?check_trace(
begin begin
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
@ -380,12 +387,12 @@ t_batch_sync_query(Config) ->
% Send 3 async messages while resource is down. When it comes back, these messages % Send 3 async messages while resource is down. When it comes back, these messages
% will be delivered in sync way. If we try to send sync messages directly, it will % will be delivered in sync way. If we try to send sync messages directly, it will
% be sent async as callback_mode is set to async_if_possible. % be sent async as callback_mode is set to async_if_possible.
Message = {send_message, Params},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(1000), ct:sleep(1000),
emqx_resource:query(ResourceId, Message), emqx_bridge:send_message(BridgeId, Params),
emqx_resource:query(ResourceId, Message), emqx_bridge:send_message(BridgeId, Params),
emqx_resource:query(ResourceId, Message) emqx_bridge:send_message(BridgeId, Params),
ok
end), end),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,

View File

@ -1064,9 +1064,17 @@ t_cluster(Config) ->
?check_trace( ?check_trace(
begin begin
Nodes = [N1, N2 | _] = start_cluster(Cluster), Nodes = [N1, N2 | _] = start_cluster(Cluster),
%% wait until bridge app supervisor is up; by that point,
%% `emqx_config_handler:add_handler' has been called and the node should be
%% ready to create bridges.
NumNodes = length(Nodes),
{ok, _} = snabbkaffe:block_until(
?match_n_events(NumNodes, #{?snk_kind := emqx_bridge_app_started}),
15_000
),
{ok, SRef0} = snabbkaffe:subscribe( {ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := pulsar_producer_bridge_started}), ?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
length(Nodes), NumNodes,
15_000 15_000
), ),
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),

View File

@ -102,7 +102,6 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->