refactor(buffer_worker): check if inflight is full before flushing

This commit is contained in:
Thales Macedo Garitezi 2023-01-13 17:30:54 -03:00
parent 344eeebe63
commit 5dd24a64c3
2 changed files with 31 additions and 29 deletions

View File

@ -370,13 +370,18 @@ maybe_flush(Data0) ->
flush(Data0) -> flush(Data0) ->
#{ #{
batch_size := BatchSize, batch_size := BatchSize,
inflight_tid := InflightTID,
queue := Q0 queue := Q0
} = Data0, } = Data0,
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
case queue_count(Q0) of case {queue_count(Q0), is_inflight_full(InflightTID)} of
0 -> {0, _} ->
{keep_state, Data1}; {keep_state, Data1};
_ -> {_, true} ->
?tp(resource_worker_flush_but_inflight_full, #{}),
Data2 = ensure_flush_timer(Data1),
{keep_state, Data2};
{_, false} ->
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1, IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't %% We *must* use the new queue, because we currently can't

View File

@ -436,8 +436,8 @@ t_query_counter_async_inflight(_) ->
%% this will block the resource_worker as the inflight window is full now %% this will block the resource_worker as the inflight window is full now
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 2}), emqx_resource:query(?ID, {inc_counter, 199}),
#{?snk_kind := resource_worker_enter_blocked}, #{?snk_kind := resource_worker_flush_but_inflight_full},
1_000 1_000
), ),
?assertMatch(0, ets:info(Tab0, size)), ?assertMatch(0, ets:info(Tab0, size)),
@ -449,25 +449,24 @@ t_query_counter_async_inflight(_) ->
ets:insert(Tab, {Ref, Result}), ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{}) ?tp(tmp_query_inserted, #{})
end, end,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 3}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
#{?snk_kind := tmp_query_inserted},
1_000
),
%% since this counts as a failure, it'll be enqueued and retried %% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked. %% later, when the resource is unblocked.
?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), {ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 99}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
#{?snk_kind := resource_worker_appended_to_queue},
1_000
),
tap_metrics(?LINE), tap_metrics(?LINE),
%% all responses should be received after the resource is resumed. %% all responses should be received after the resource is resumed.
{ok, SRef0} = snabbkaffe:subscribe( {ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}), ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
%% +1 because the tmp_query above will be retried and succeed %% +2 because the tmp_query above will be retried and succeed
%% this time. %% this time, and there was the inc 199 request as well.
WindowSize + 1, WindowSize + 2,
_Timeout0 = 10_000 _Timeout0 = 10_000
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@ -478,8 +477,6 @@ t_query_counter_async_inflight(_) ->
%% take it again from the table; this time, it should have %% take it again from the table; this time, it should have
%% succeeded. %% succeeded.
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
?assertEqual(WindowSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
%% send async query, this time everything should be ok. %% send async query, this time everything should be ok.
Num = 10, Num = 10,
@ -496,11 +493,12 @@ t_query_counter_async_inflight(_) ->
end, end,
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace),
end
),
?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE), tap_metrics(?LINE),
ok
end
),
%% block the resource %% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
@ -525,7 +523,7 @@ t_query_counter_async_inflight(_) ->
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent, ets:info(Tab0, size)), ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE), tap_metrics(?LINE),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
@ -642,7 +640,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 2}), emqx_resource:query(?ID, {inc_counter, 2}),
#{?snk_kind := resource_worker_enter_blocked}, #{?snk_kind := resource_worker_flush_but_inflight_full},
5_000 5_000
), ),
?assertMatch(0, ets:info(Tab0, size)), ?assertMatch(0, ets:info(Tab0, size)),
@ -658,17 +656,16 @@ t_query_counter_async_inflight_batch(_) ->
ets:insert(Tab, {Ref, Result}), ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{}) ?tp(tmp_query_inserted, #{})
end, end,
%% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked.
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 3}, #{ emqx_resource:query(?ID, {inc_counter, 3}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]} async_reply_fun => {Insert, [Tab0, tmp_query]}
}), }),
#{?snk_kind := tmp_query_inserted}, #{?snk_kind := resource_worker_appended_to_queue},
1_000 1_000
), ),
%% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked.
?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
tap_metrics(?LINE), tap_metrics(?LINE),
%% all responses should be received after the resource is resumed. %% all responses should be received after the resource is resumed.
@ -745,7 +742,7 @@ t_query_counter_async_inflight_batch(_) ->
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent, ets:info(Tab0, size)), ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),