Merge pull request #9879 from keynslug/fwup/fix/buffer-worker-testcase

test(bufworker): fix testcase flapping due to data races
This commit is contained in:
Andrew Mayorov 2023-02-01 15:33:59 +04:00 committed by GitHub
commit 979e9804be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 15 deletions

View File

@ -618,6 +618,8 @@ t_query_counter_async_inflight_batch(_) ->
), ),
tap_metrics(?LINE), tap_metrics(?LINE),
Sent1 = NumMsgs + BatchSize,
?check_trace( ?check_trace(
begin begin
%% this will block the resource_worker as the inflight window is full now %% this will block the resource_worker as the inflight window is full now
@ -633,6 +635,12 @@ t_query_counter_async_inflight_batch(_) ->
[] []
), ),
%% NOTE
%% The query above won't affect the size of the results table for some reason,
%% it's not clear if this is expected behaviour. Only the `async_reply_fun`
%% defined below will be called for the whole batch consisting of 2 increments.
Sent2 = Sent1 + 0,
tap_metrics(?LINE), tap_metrics(?LINE),
%% send query now will fail because the resource is blocked. %% send query now will fail because the resource is blocked.
Insert = fun(Tab, Ref, Result) -> Insert = fun(Tab, Ref, Result) ->
@ -658,7 +666,7 @@ t_query_counter_async_inflight_batch(_) ->
%% +2 because the tmp_query above will be retried and succeed %% +2 because the tmp_query above will be retried and succeed
%% this time. %% this time.
WindowSize + 2, WindowSize + 2,
10_000 5_000
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE), tap_metrics(?LINE),
@ -666,8 +674,8 @@ t_query_counter_async_inflight_batch(_) ->
%% since the previous tmp_query was enqueued to be retried, we %% since the previous tmp_query was enqueued to be retried, we
%% take it again from the table; this time, it should have %% take it again from the table; this time, it should have
%% succeeded. %% succeeded.
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), ?assertEqual([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), ?assertEqual(Sent2, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE), tap_metrics(?LINE),
%% send async query, this time everything should be ok. %% send async query, this time everything should be ok.
@ -679,7 +687,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, SRef} = snabbkaffe:subscribe( {ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}), ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NumBatches1, NumBatches1,
10_000 5_000
), ),
inc_counter_in_parallel(NumMsgs1, ReqOpts), inc_counter_in_parallel(NumMsgs1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef), {ok, _} = snabbkaffe:receive_events(SRef),
@ -693,11 +701,10 @@ t_query_counter_async_inflight_batch(_) ->
) )
end end
), ),
?assertEqual(
NumMsgs + BatchSize + NumMsgs1, Sent3 = Sent2 + NumMsgs1,
ets:info(Tab0, size),
#{tab => ets:tab2list(Tab0)} ?assertEqual(Sent3, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
),
tap_metrics(?LINE), tap_metrics(?LINE),
%% block the resource %% block the resource
@ -720,22 +727,23 @@ t_query_counter_async_inflight_batch(_) ->
end end
), ),
Sent4 = Sent3 + NumMsgs + BatchSize,
%% this will block the resource_worker %% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}), ok = emqx_resource:query(?ID, {inc_counter, 1}),
Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs,
{ok, SRef1} = snabbkaffe:subscribe( {ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}), ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize, WindowSize + 1,
10_000 5_000
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), ?assertEqual(Sent4, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent4]),
?assert(Sent =< Counter), ?assert(Sent4 =< Counter),
%% give the metrics some time to stabilize. %% give the metrics some time to stabilize.
ct:sleep(1000), ct:sleep(1000),