diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 79220321e..8f093ef5c 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -506,7 +506,17 @@ t_write_failure(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), - {ok, _} = create_bridge(Config), + {ok, _} = create_bridge( + Config, + #{ + <<"resource_opts">> => + #{ + <<"auto_restart_interval">> => <<"100ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> + } + } + ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{ topic => atom_to_binary(?FUNCTION_NAME), @@ -523,7 +533,9 @@ t_write_failure(Config) -> async -> send_message(Config, SentData) end, - #{?snk_kind := buffer_worker_flush_nack}, + #{?snk_kind := Evt} when + Evt =:= buffer_worker_flush_nack orelse + Evt =:= buffer_worker_retry_inflight_failed, 10_000 ) end), diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 9f2011779..e4f17d76a 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -258,13 +258,18 @@ query_resource(Config, Request) -> emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). query_resource_async(Config, Request) -> + query_resource_async(Config, Request, _Opts = #{}). + +query_resource_async(Config, Request, Opts) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Timeout = maps:get(timeout, Opts, 500), Return = emqx_resource:query(ResourceID, Request, #{ - timeout => 500, async_reply_fun => {AsyncReplyFun, []} + timeout => Timeout, + async_reply_fun => {AsyncReplyFun, []} }), {Return, Ref}. @@ -498,9 +503,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"auto_restart_interval">> => <<"100ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), @@ -515,7 +520,7 @@ t_write_timeout(Config) -> Res1 = case QueryMode of async -> - query_resource_async(Config, {send_message, SentData}); + query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000}); sync -> query_resource(Config, {send_message, SentData}) end, @@ -526,7 +531,17 @@ t_write_timeout(Config) -> {_, Ref} when is_reference(Ref) -> case receive_result(Ref, 15_000) of {ok, Res} -> - ?assertMatch({error, {unrecoverable_error, _}}, Res); + %% we may receive a successful result depending on + %% timing, if the request is retried after the + %% failure is healed. + case Res of + {error, {unrecoverable_error, _}} -> + ok; + {ok, _} -> + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end; timeout -> ct:pal("mailbox:\n ~p", [process_info(self(), messages)]), ct:fail("no response received") diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index 913b29c86..ea2240efd 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -25,7 +25,12 @@ -include_lib("emqx/include/logger.hrl"). +-ifndef(TEST). -define(HEALTH_CHECK_TIMEOUT, 15000). +-else. +%% make tests faster +-define(HEALTH_CHECK_TIMEOUT, 1000). +-endif. start(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of