From 5dd24a64c384ea7dc1f2850b2727d311f9a4fca5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 13 Jan 2023 17:30:54 -0300 Subject: [PATCH] refactor(buffer_worker): check if inflight is full before flushing --- .../src/emqx_resource_worker.erl | 11 +++-- .../test/emqx_resource_SUITE.erl | 49 +++++++++---------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index f9940210a..eafe7cea6 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -370,13 +370,18 @@ maybe_flush(Data0) -> flush(Data0) -> #{ batch_size := BatchSize, + inflight_tid := InflightTID, queue := Q0 } = Data0, Data1 = cancel_flush_timer(Data0), - case queue_count(Q0) of - 0 -> + case {queue_count(Q0), is_inflight_full(InflightTID)} of + {0, _} -> {keep_state, Data1}; - _ -> + {_, true} -> + ?tp(resource_worker_flush_but_inflight_full, #{}), + Data2 = ensure_flush_timer(Data1), + {keep_state, Data2}; + {_, false} -> {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), IsBatch = BatchSize =/= 1, %% We *must* use the new queue, because we currently can't diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 047245c8b..d837cbc8c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -436,8 +436,8 @@ t_query_counter_async_inflight(_) -> %% this will block the resource_worker as the inflight window is full now {ok, {ok, _}} = ?wait_async_action( - emqx_resource:query(?ID, {inc_counter, 2}), - #{?snk_kind := resource_worker_enter_blocked}, + emqx_resource:query(?ID, {inc_counter, 199}), + #{?snk_kind := resource_worker_flush_but_inflight_full}, 1_000 ), ?assertMatch(0, ets:info(Tab0, size)), @@ -449,25 +449,24 @@ t_query_counter_async_inflight(_) -> ets:insert(Tab, {Ref, Result}), ?tp(tmp_query_inserted, #{}) end, - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:query(?ID, {inc_counter, 3}, #{ - async_reply_fun => {Insert, [Tab0, tmp_query]} - }), - #{?snk_kind := tmp_query_inserted}, - 1_000 - ), %% since this counts as a failure, it'll be enqueued and retried %% later, when the resource is unblocked. - ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 99}, #{ + async_reply_fun => {Insert, [Tab0, tmp_query]} + }), + #{?snk_kind := resource_worker_appended_to_queue}, + 1_000 + ), tap_metrics(?LINE), %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +1 because the tmp_query above will be retried and succeed - %% this time. - WindowSize + 1, + %% +2 because the tmp_query above will be retried and succeed + %% this time, and there was the inc 199 request as well. + WindowSize + 2, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -478,8 +477,6 @@ t_query_counter_async_inflight(_) -> %% take it again from the table; this time, it should have %% succeeded. ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), - ?assertEqual(WindowSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), - tap_metrics(?LINE), %% send async query, this time everything should be ok. Num = 10, @@ -496,11 +493,12 @@ t_query_counter_async_inflight(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace), + ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + tap_metrics(?LINE), + ok end ), - ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), - tap_metrics(?LINE), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), @@ -525,7 +523,7 @@ t_query_counter_async_inflight(_) -> ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), - ?assertEqual(Sent, ets:info(Tab0, size)), + ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), @@ -642,7 +640,7 @@ t_query_counter_async_inflight_batch(_) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 2}), - #{?snk_kind := resource_worker_enter_blocked}, + #{?snk_kind := resource_worker_flush_but_inflight_full}, 5_000 ), ?assertMatch(0, ets:info(Tab0, size)), @@ -658,17 +656,16 @@ t_query_counter_async_inflight_batch(_) -> ets:insert(Tab, {Ref, Result}), ?tp(tmp_query_inserted, #{}) end, + %% since this counts as a failure, it'll be enqueued and retried + %% later, when the resource is unblocked. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 3}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), - #{?snk_kind := tmp_query_inserted}, + #{?snk_kind := resource_worker_appended_to_queue}, 1_000 ), - %% since this counts as a failure, it'll be enqueued and retried - %% later, when the resource is unblocked. - ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), tap_metrics(?LINE), %% all responses should be received after the resource is resumed. @@ -745,7 +742,7 @@ t_query_counter_async_inflight_batch(_) -> ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), - ?assertEqual(Sent, ets:info(Tab0, size)), + ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),