From c76311c9c306c8ede80b54691caedb52a9d5442c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 31 Jan 2023 14:17:45 +0300 Subject: [PATCH] fix(buffer): count inflight batches properly --- .../src/emqx_resource_buffer_worker.erl | 9 +++--- .../test/emqx_resource_SUITE.erl | 32 +++++++++++-------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 50534df4f..c5395c8df 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1121,6 +1121,10 @@ append_queue(Id, Index, Q, Queries) -> -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). +%% NOTE +%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝ +-define(INFLIGHT_META_ROWS, 4). + inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, @@ -1181,12 +1185,9 @@ is_inflight_full(InflightTID) -> Size >= MaxSize. inflight_num_batches(InflightTID) -> - %% Note: we subtract 2 because there're 2 metadata rows that hold - %% the maximum size value and the number of messages. - MetadataRowCount = 2, case ets:info(InflightTID, size) of undefined -> 0; - Size -> max(0, Size - MetadataRowCount) + Size -> max(0, Size - ?INFLIGHT_META_ROWS) end. inflight_num_msgs(InflightTID) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 227b6fedc..27101d1cc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -411,7 +411,8 @@ t_query_counter_async_inflight(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(WindowSize, ReqOpts), + %% one more so that inflight would be already full upon last query + inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), @@ -445,9 +446,9 @@ t_query_counter_async_inflight(_) -> %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +1 because the tmp_query above will be retried and succeed + %% +2 because the tmp_query above will be retried and succeed %% this time. - WindowSize + 1, + WindowSize + 2, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -475,7 +476,7 @@ t_query_counter_async_inflight(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), - ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok end @@ -487,7 +488,8 @@ t_query_counter_async_inflight(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(WindowSize, ReqOpts), + %% one more so that inflight would be already full upon last query + inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), @@ -500,10 +502,10 @@ t_query_counter_async_inflight(_) -> %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 4}), - Sent = WindowSize + Num + WindowSize, + Sent = WindowSize + 1 + Num + WindowSize + 1, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - WindowSize, + WindowSize + 1, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -593,7 +595,8 @@ t_query_counter_async_inflight_batch(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(NumMsgs, ReqOpts), + %% a batch more so that inflight would be already full upon last query + inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), @@ -652,9 +655,9 @@ t_query_counter_async_inflight_batch(_) -> %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +1 because the tmp_query above will be retried and succeed + %% +2 because the tmp_query above will be retried and succeed %% this time. - WindowSize + 1, + WindowSize + 2, 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -664,7 +667,7 @@ t_query_counter_async_inflight_batch(_) -> %% take it again from the table; this time, it should have %% succeeded. ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), - ?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% send async query, this time everything should be ok. @@ -691,7 +694,7 @@ t_query_counter_async_inflight_batch(_) -> end ), ?assertEqual( - NumMsgs + NumMsgs1, + NumMsgs + BatchSize + NumMsgs1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)} ), @@ -703,7 +706,8 @@ t_query_counter_async_inflight_batch(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(NumMsgs, ReqOpts), + %% a batch more so that inflight would be already full upon last query + inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), @@ -719,7 +723,7 @@ t_query_counter_async_inflight_batch(_) -> %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 1}), - Sent = NumMsgs + NumMsgs1 + NumMsgs, + Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize,