test(bufworker): fix testcase flapping due to data races
This commit is contained in:
parent
0c80c31c9e
commit
ff473e0f1b
|
@ -618,6 +618,8 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
),
|
||||
tap_metrics(?LINE),
|
||||
|
||||
Sent1 = NumMsgs + BatchSize,
|
||||
|
||||
?check_trace(
|
||||
begin
|
||||
%% this will block the resource_worker as the inflight window is full now
|
||||
|
@ -633,6 +635,12 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
[]
|
||||
),
|
||||
|
||||
%% NOTE
|
||||
%% The query above won't affect the size of the results table for some reason,
|
||||
%% it's not clear if this is expected behaviour. Only the `async_reply_fun`
|
||||
%% defined below will be called for the whole batch consisting of 2 increments.
|
||||
Sent2 = Sent1 + 0,
|
||||
|
||||
tap_metrics(?LINE),
|
||||
%% send query now will fail because the resource is blocked.
|
||||
Insert = fun(Tab, Ref, Result) ->
|
||||
|
@ -658,7 +666,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
%% +2 because the tmp_query above will be retried and succeed
|
||||
%% this time.
|
||||
WindowSize + 2,
|
||||
10_000
|
||||
5_000
|
||||
),
|
||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||
tap_metrics(?LINE),
|
||||
|
@ -666,8 +674,8 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
%% since the previous tmp_query was enqueued to be retried, we
|
||||
%% take it again from the table; this time, it should have
|
||||
%% succeeded.
|
||||
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
||||
?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||
?assertEqual([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
||||
?assertEqual(Sent2, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||
tap_metrics(?LINE),
|
||||
|
||||
%% send async query, this time everything should be ok.
|
||||
|
@ -679,7 +687,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
{ok, SRef} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||
NumBatches1,
|
||||
10_000
|
||||
5_000
|
||||
),
|
||||
inc_counter_in_parallel(NumMsgs1, ReqOpts),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef),
|
||||
|
@ -693,11 +701,10 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
)
|
||||
end
|
||||
),
|
||||
?assertEqual(
|
||||
NumMsgs + BatchSize + NumMsgs1,
|
||||
ets:info(Tab0, size),
|
||||
#{tab => ets:tab2list(Tab0)}
|
||||
),
|
||||
|
||||
Sent3 = Sent2 + NumMsgs1,
|
||||
|
||||
?assertEqual(Sent3, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||
tap_metrics(?LINE),
|
||||
|
||||
%% block the resource
|
||||
|
@ -720,22 +727,23 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
end
|
||||
),
|
||||
|
||||
Sent4 = Sent3 + NumMsgs + BatchSize,
|
||||
|
||||
%% this will block the resource_worker
|
||||
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
||||
|
||||
Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs,
|
||||
{ok, SRef1} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||
WindowSize,
|
||||
10_000
|
||||
WindowSize + 1,
|
||||
5_000
|
||||
),
|
||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||
?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||
?assertEqual(Sent4, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||
|
||||
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
||||
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
|
||||
?assert(Sent =< Counter),
|
||||
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent4]),
|
||||
?assert(Sent4 =< Counter),
|
||||
|
||||
%% give the metrics some time to stabilize.
|
||||
ct:sleep(1000),
|
||||
|
|
Loading…
Reference in New Issue