diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index a042bfb67..984b3b04a 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2291,6 +2291,67 @@ t_expiration_retry_batch_multiple_times(_Config) -> ), ok. +t_recursive_flush(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + batch_time => 10_000, + worker_pool_size => 1 + } + ), + do_t_recursive_flush(). + +t_recursive_flush_batch(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 10_000, + worker_pool_size => 1 + } + ), + do_t_recursive_flush(). + +do_t_recursive_flush() -> + ?check_trace( + begin + Timeout = 1_000, + Pid = spawn_link(fun S() -> + emqx_resource:query(?ID, {inc_counter, 1}), + S() + end), + %% we want two reflushes to happen before we analyze the + %% trace, so that we get a single full interaction + {ok, _} = snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout + ), + unlink(Pid), + exit(Pid, kill), + ok + end, + fun(Trace) -> + %% check that a recursive flush leads to a new call to flush/1 + Pairs = ?find_pairs( + #{?snk_kind := buffer_worker_flush_ack_reflush}, + #{?snk_kind := buffer_worker_flush}, + Trace + ), + ?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs])) + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------