fix(buffer): count inflight batches properly
This commit is contained in:
parent
d0c10b59aa
commit
c76311c9c3
|
@ -1121,6 +1121,10 @@ append_queue(Id, Index, Q, Queries) ->
|
||||||
-define(INITIAL_TIME_REF, initial_time).
|
-define(INITIAL_TIME_REF, initial_time).
|
||||||
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
|
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
|
||||||
|
|
||||||
|
%% NOTE
|
||||||
|
%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝
|
||||||
|
-define(INFLIGHT_META_ROWS, 4).
|
||||||
|
|
||||||
inflight_new(InfltWinSZ, Id, Index) ->
|
inflight_new(InfltWinSZ, Id, Index) ->
|
||||||
TableId = ets:new(
|
TableId = ets:new(
|
||||||
emqx_resource_buffer_worker_inflight_tab,
|
emqx_resource_buffer_worker_inflight_tab,
|
||||||
|
@ -1181,12 +1185,9 @@ is_inflight_full(InflightTID) ->
|
||||||
Size >= MaxSize.
|
Size >= MaxSize.
|
||||||
|
|
||||||
inflight_num_batches(InflightTID) ->
|
inflight_num_batches(InflightTID) ->
|
||||||
%% Note: we subtract 2 because there're 2 metadata rows that hold
|
|
||||||
%% the maximum size value and the number of messages.
|
|
||||||
MetadataRowCount = 2,
|
|
||||||
case ets:info(InflightTID, size) of
|
case ets:info(InflightTID, size) of
|
||||||
undefined -> 0;
|
undefined -> 0;
|
||||||
Size -> max(0, Size - MetadataRowCount)
|
Size -> max(0, Size - ?INFLIGHT_META_ROWS)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
inflight_num_msgs(InflightTID) ->
|
inflight_num_msgs(InflightTID) ->
|
||||||
|
|
|
@ -411,7 +411,8 @@ t_query_counter_async_inflight(_) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
{_, {ok, _}} =
|
{_, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
inc_counter_in_parallel(WindowSize, ReqOpts),
|
%% one more so that inflight would be already full upon last query
|
||||||
|
inc_counter_in_parallel(WindowSize + 1, ReqOpts),
|
||||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||||
1_000
|
1_000
|
||||||
),
|
),
|
||||||
|
@ -445,9 +446,9 @@ t_query_counter_async_inflight(_) ->
|
||||||
%% all responses should be received after the resource is resumed.
|
%% all responses should be received after the resource is resumed.
|
||||||
{ok, SRef0} = snabbkaffe:subscribe(
|
{ok, SRef0} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
%% +1 because the tmp_query above will be retried and succeed
|
%% +2 because the tmp_query above will be retried and succeed
|
||||||
%% this time.
|
%% this time.
|
||||||
WindowSize + 1,
|
WindowSize + 2,
|
||||||
_Timeout0 = 10_000
|
_Timeout0 = 10_000
|
||||||
),
|
),
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||||
|
@ -475,7 +476,7 @@ t_query_counter_async_inflight(_) ->
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
QueryTrace = ?of_kind(call_query_async, Trace),
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
||||||
?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
|
?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
|
||||||
?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||||
tap_metrics(?LINE),
|
tap_metrics(?LINE),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
|
@ -487,7 +488,8 @@ t_query_counter_async_inflight(_) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
{_, {ok, _}} =
|
{_, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
inc_counter_in_parallel(WindowSize, ReqOpts),
|
%% one more so that inflight would be already full upon last query
|
||||||
|
inc_counter_in_parallel(WindowSize + 1, ReqOpts),
|
||||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||||
1_000
|
1_000
|
||||||
),
|
),
|
||||||
|
@ -500,10 +502,10 @@ t_query_counter_async_inflight(_) ->
|
||||||
%% this will block the resource_worker
|
%% this will block the resource_worker
|
||||||
ok = emqx_resource:query(?ID, {inc_counter, 4}),
|
ok = emqx_resource:query(?ID, {inc_counter, 4}),
|
||||||
|
|
||||||
Sent = WindowSize + Num + WindowSize,
|
Sent = WindowSize + 1 + Num + WindowSize + 1,
|
||||||
{ok, SRef1} = snabbkaffe:subscribe(
|
{ok, SRef1} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
WindowSize,
|
WindowSize + 1,
|
||||||
_Timeout0 = 10_000
|
_Timeout0 = 10_000
|
||||||
),
|
),
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||||
|
@ -593,7 +595,8 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
{_, {ok, _}} =
|
{_, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
inc_counter_in_parallel(NumMsgs, ReqOpts),
|
%% a batch more so that inflight would be already full upon last query
|
||||||
|
inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
|
||||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||||
5_000
|
5_000
|
||||||
),
|
),
|
||||||
|
@ -652,9 +655,9 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
%% all responses should be received after the resource is resumed.
|
%% all responses should be received after the resource is resumed.
|
||||||
{ok, SRef0} = snabbkaffe:subscribe(
|
{ok, SRef0} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
%% +1 because the tmp_query above will be retried and succeed
|
%% +2 because the tmp_query above will be retried and succeed
|
||||||
%% this time.
|
%% this time.
|
||||||
WindowSize + 1,
|
WindowSize + 2,
|
||||||
10_000
|
10_000
|
||||||
),
|
),
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||||
|
@ -664,7 +667,7 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
%% take it again from the table; this time, it should have
|
%% take it again from the table; this time, it should have
|
||||||
%% succeeded.
|
%% succeeded.
|
||||||
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
||||||
?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||||
tap_metrics(?LINE),
|
tap_metrics(?LINE),
|
||||||
|
|
||||||
%% send async query, this time everything should be ok.
|
%% send async query, this time everything should be ok.
|
||||||
|
@ -691,7 +694,7 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
NumMsgs + NumMsgs1,
|
NumMsgs + BatchSize + NumMsgs1,
|
||||||
ets:info(Tab0, size),
|
ets:info(Tab0, size),
|
||||||
#{tab => ets:tab2list(Tab0)}
|
#{tab => ets:tab2list(Tab0)}
|
||||||
),
|
),
|
||||||
|
@ -703,7 +706,8 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
{_, {ok, _}} =
|
{_, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
inc_counter_in_parallel(NumMsgs, ReqOpts),
|
%% a batch more so that inflight would be already full upon last query
|
||||||
|
inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
|
||||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||||
5_000
|
5_000
|
||||||
),
|
),
|
||||||
|
@ -719,7 +723,7 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
%% this will block the resource_worker
|
%% this will block the resource_worker
|
||||||
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
||||||
|
|
||||||
Sent = NumMsgs + NumMsgs1 + NumMsgs,
|
Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs,
|
||||||
{ok, SRef1} = snabbkaffe:subscribe(
|
{ok, SRef1} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
WindowSize,
|
WindowSize,
|
||||||
|
|
Loading…
Reference in New Issue