Merge pull request #10812 from thalesmg/test-flakiness-20230524
test: attempts to reduce flakiness (pgsql, cassandra)
This commit is contained in:
commit
18d57ba3eb
|
@ -506,7 +506,17 @@ t_write_failure(Config) ->
|
|||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
QueryMode = ?config(query_mode, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
{ok, _} = create_bridge(
|
||||
Config,
|
||||
#{
|
||||
<<"resource_opts">> =>
|
||||
#{
|
||||
<<"auto_restart_interval">> => <<"100ms">>,
|
||||
<<"resume_interval">> => <<"100ms">>,
|
||||
<<"health_check_interval">> => <<"100ms">>
|
||||
}
|
||||
}
|
||||
),
|
||||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{
|
||||
topic => atom_to_binary(?FUNCTION_NAME),
|
||||
|
@ -523,7 +533,9 @@ t_write_failure(Config) ->
|
|||
async ->
|
||||
send_message(Config, SentData)
|
||||
end,
|
||||
#{?snk_kind := buffer_worker_flush_nack},
|
||||
#{?snk_kind := Evt} when
|
||||
Evt =:= buffer_worker_flush_nack orelse
|
||||
Evt =:= buffer_worker_retry_inflight_failed,
|
||||
10_000
|
||||
)
|
||||
end),
|
||||
|
|
|
@ -258,13 +258,18 @@ query_resource(Config, Request) ->
|
|||
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
||||
|
||||
query_resource_async(Config, Request) ->
|
||||
query_resource_async(Config, Request, _Opts = #{}).
|
||||
|
||||
query_resource_async(Config, Request, Opts) ->
|
||||
Name = ?config(pgsql_name, Config),
|
||||
BridgeType = ?config(pgsql_bridge_type, Config),
|
||||
Ref = alias([reply]),
|
||||
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
Timeout = maps:get(timeout, Opts, 500),
|
||||
Return = emqx_resource:query(ResourceID, Request, #{
|
||||
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
|
||||
timeout => Timeout,
|
||||
async_reply_fun => {AsyncReplyFun, []}
|
||||
}),
|
||||
{Return, Ref}.
|
||||
|
||||
|
@ -498,9 +503,9 @@ t_write_timeout(Config) ->
|
|||
Config,
|
||||
#{
|
||||
<<"resource_opts">> => #{
|
||||
<<"request_timeout">> => 500,
|
||||
<<"resume_interval">> => 100,
|
||||
<<"health_check_interval">> => 100
|
||||
<<"auto_restart_interval">> => <<"100ms">>,
|
||||
<<"resume_interval">> => <<"100ms">>,
|
||||
<<"health_check_interval">> => <<"100ms">>
|
||||
}
|
||||
}
|
||||
),
|
||||
|
@ -515,7 +520,7 @@ t_write_timeout(Config) ->
|
|||
Res1 =
|
||||
case QueryMode of
|
||||
async ->
|
||||
query_resource_async(Config, {send_message, SentData});
|
||||
query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
|
||||
sync ->
|
||||
query_resource(Config, {send_message, SentData})
|
||||
end,
|
||||
|
@ -526,7 +531,17 @@ t_write_timeout(Config) ->
|
|||
{_, Ref} when is_reference(Ref) ->
|
||||
case receive_result(Ref, 15_000) of
|
||||
{ok, Res} ->
|
||||
?assertMatch({error, {unrecoverable_error, _}}, Res);
|
||||
%% we may receive a successful result depending on
|
||||
%% timing, if the request is retried after the
|
||||
%% failure is healed.
|
||||
case Res of
|
||||
{error, {unrecoverable_error, _}} ->
|
||||
ok;
|
||||
{ok, _} ->
|
||||
ok;
|
||||
_ ->
|
||||
ct:fail("unexpected result: ~p", [Res])
|
||||
end;
|
||||
timeout ->
|
||||
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
|
||||
ct:fail("no response received")
|
||||
|
|
|
@ -25,7 +25,12 @@
|
|||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-ifndef(TEST).
|
||||
-define(HEALTH_CHECK_TIMEOUT, 15000).
|
||||
-else.
|
||||
%% make tests faster
|
||||
-define(HEALTH_CHECK_TIMEOUT, 1000).
|
||||
-endif.
|
||||
|
||||
start(Name, Mod, Options) ->
|
||||
case ecpool:start_sup_pool(Name, Mod, Options) of
|
||||
|
|
Loading…
Reference in New Issue