diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 1516fc870..620516a88 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -618,6 +618,8 @@ t_query_counter_async_inflight_batch(_) -> ), tap_metrics(?LINE), + Sent1 = NumMsgs + BatchSize, + ?check_trace( begin %% this will block the resource_worker as the inflight window is full now @@ -633,6 +635,12 @@ t_query_counter_async_inflight_batch(_) -> [] ), + %% NOTE + %% The query above won't affect the size of the results table for some reason, + %% it's not clear if this is expected behaviour. Only the `async_reply_fun` + %% defined below will be called for the whole batch consisting of 2 increments. + Sent2 = Sent1 + 0, + tap_metrics(?LINE), %% send query now will fail because the resource is blocked. Insert = fun(Tab, Ref, Result) -> @@ -658,7 +666,7 @@ t_query_counter_async_inflight_batch(_) -> %% +2 because the tmp_query above will be retried and succeed %% this time. WindowSize + 2, - 10_000 + 5_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), tap_metrics(?LINE), @@ -666,8 +674,8 @@ t_query_counter_async_inflight_batch(_) -> %% since the previous tmp_query was enqueued to be retried, we %% take it again from the table; this time, it should have %% succeeded. - ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), - ?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual([{tmp_query, ok}], ets:take(Tab0, tmp_query)), + ?assertEqual(Sent2, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% send async query, this time everything should be ok. @@ -679,7 +687,7 @@ t_query_counter_async_inflight_batch(_) -> {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), NumBatches1, - 10_000 + 5_000 ), inc_counter_in_parallel(NumMsgs1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), @@ -693,11 +701,10 @@ t_query_counter_async_inflight_batch(_) -> ) end ), - ?assertEqual( - NumMsgs + BatchSize + NumMsgs1, - ets:info(Tab0, size), - #{tab => ets:tab2list(Tab0)} - ), + + Sent3 = Sent2 + NumMsgs1, + + ?assertEqual(Sent3, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% block the resource @@ -720,22 +727,23 @@ t_query_counter_async_inflight_batch(_) -> end ), + Sent4 = Sent3 + NumMsgs + BatchSize, + %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 1}), - Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - WindowSize, - 10_000 + WindowSize + 1, + 5_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), - ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual(Sent4, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), - ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), - ?assert(Sent =< Counter), + ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent4]), + ?assert(Sent4 =< Counter), %% give the metrics some time to stabilize. ct:sleep(1000),