diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 82c95cb99..bc146cc8e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1944,7 +1944,7 @@ t_expiration_async_batch_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => 2_000 @@ -1959,7 +1959,7 @@ do_t_expiration_async_after_reply(IsBatch) -> NAcks = case IsBatch of batch -> 1; - single -> 2 + single -> 3 end, ?force_ordering( #{?snk_kind := buffer_worker_flush_ack}, @@ -1980,6 +1980,10 @@ do_t_expiration_async_after_reply(IsBatch) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) ), @@ -2004,23 +2008,37 @@ do_t_expiration_async_after_reply(IsBatch) -> ok end, fun(Trace) -> - ?assertMatch( - [ - #{ - expired := [{query, _, {inc_counter, 199}, _, _}] - } - ], - ?of_kind(handle_async_reply_expired, Trace) - ), + case IsBatch of + batch -> + ?assertMatch( + [ + #{ + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] + } + ], + ?of_kind(handle_async_reply_expired, Trace) + ); + single -> + ?assertMatch( + [ + #{expired := [{query, _, {inc_counter, 199}, _, _}]}, + #{expired := [{query, _, {inc_counter, 299}, _, _}]} + ], + ?of_kind(handle_async_reply_expired, Trace) + ) + end, Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ - matched := 2, + matched := 3, %% the request with infinity timeout. success := 1, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 } @@ -2042,7 +2060,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => ResumeInterval @@ -2067,6 +2085,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), Pid0 = spawn_link(fun() -> ?tp(delay_enter, #{}), @@ -2087,7 +2109,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( [ #{ - expired := [{query, _, {inc_counter, 199}, _, _}] + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] } ], ?of_kind(handle_async_reply_expired, Trace) @@ -2096,10 +2121,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( #{ counters := #{ - matched := 1, + matched := 2, success := 0, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 },