diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 601a77deb..38de2dc34 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -77,7 +77,7 @@ blocked -> ok; ok -> - maybe_flush_after_async_reply(IsFullBefore) + ok = maybe_flush_after_async_reply(IsFullBefore) end end)() ). @@ -486,15 +486,14 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - InflightCount = inflight_num_batches(InflightTID), ?tp(buffer_worker_flush, #{ queued => CurrentCount, is_inflight_full => IsFull, - inflight => InflightCount + inflight => inflight_count(InflightTID) }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => InflightCount}), + ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -626,7 +625,7 @@ do_flush( flush_worker(self()); false -> ?tp(buffer_worker_queue_drained, #{ - inflight => inflight_num_batches(InflightTID) + inflight => inflight_count(InflightTID) }), ok end, @@ -707,7 +706,7 @@ do_flush(#{queue := Q1} = Data0, #{ case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> ?tp(buffer_worker_queue_drained, #{ - inflight => inflight_num_batches(InflightTID) + inflight => inflight_count(InflightTID) }), Data1; {true, true} -> @@ -1336,10 +1335,10 @@ is_inflight_full(InflightTID) -> [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. - Size = inflight_num_batches(InflightTID), + Size = inflight_count(InflightTID), Size >= MaxSize. -inflight_num_batches(InflightTID) -> +inflight_count(InflightTID) -> case ets:info(InflightTID, size) of undefined -> 0; Size -> max(0, Size - ?INFLIGHT_META_ROWS) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 3b5f83d05..f41087b20 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -176,7 +176,7 @@ on_batch_query(InstId, BatchReq, State) -> batch_big_payload(sync, InstId, BatchReq, State); {random_reply, Num} -> %% async batch retried - random_reply(Num) + make_random_reply(Num) end. on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> @@ -313,11 +313,11 @@ counter_loop( %% with 'ok' in the result, the buffer worker should eventually %% drain the buffer (and inflights table) ReplyCount = 1 + (RandNum rem 3), - Results = random_replies(ReplyCount), + Results = make_random_replies(ReplyCount), %% add a delay to trigger inflight full - timer:sleep(5), lists:foreach( fun(Result) -> + timer:sleep(rand:uniform(5)), apply_reply(ReplyFun, Result) end, Results @@ -354,12 +354,12 @@ maybe_register(_Name, _Pid, false) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply(ReplyFun, Args ++ [Result]). -random_replies(0) -> +make_random_replies(0) -> []; -random_replies(N) -> - [random_reply(N) | random_replies(N - 1)]. +make_random_replies(N) -> + [make_random_reply(N) | make_random_replies(N - 1)]. -random_reply(N) -> +make_random_reply(N) -> case rand:uniform(3) of 1 -> {ok, N}; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e22ca7750..9b1031c42 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1513,6 +1513,7 @@ t_async_reply_multi_eval(_Config) -> ResumeInterval = 5, TotalTime = 5_000, AsyncInflightWindow = 3, + TotalQueries = AsyncInflightWindow * 5, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( ?ID, @@ -1528,49 +1529,33 @@ t_async_reply_multi_eval(_Config) -> resume_interval => ResumeInterval } ), - ?check_trace( - #{timetrap => 30_000}, - begin - %% block - ok = emqx_resource:simple_sync_query(?ID, block), - - ?wait_async_action( - inc_counter_in_parallel( - AsyncInflightWindow * 5, - fun() -> - Rand = rand:uniform(1000), - {random_reply, Rand} - end, - #{} - ), - #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, - TotalTime - ), - ok + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + inc_counter_in_parallel( + TotalQueries, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} end, - [ - fun(Trace) -> - ?assertMatch( - [#{inflight := 0} | _], - lists:reverse(?of_kind(buffer_worker_queue_drained, Trace)) - ) - end - ] + #{} ), - Metrics = tap_metrics(?LINE), - #{ - counters := Counters, - gauges := #{queuing := 0, inflight := 0} - } = Metrics, - #{ - matched := Matched, - success := Success, - dropped := Dropped, - late_reply := LateReply, - failed := Failed - } = Counters, - ?assertEqual(Matched, Success + Dropped + LateReply + Failed), - ok. + F = fun() -> + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(TotalQueries, Matched - 1), + ?assertEqual(Matched, Success + Dropped + LateReply + Failed) + end, + loop_wait(F, _Interval = 5, TotalTime). t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, @@ -2637,3 +2622,15 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. + +loop_wait(F, Interval, TotalTime) when Interval >= TotalTime -> + %% do it for the last time + F(); +loop_wait(F, Interval, TotalTime) -> + try + F() + catch + _:_ -> + timer:sleep(Interval), + loop_wait(F, Interval, TotalTime - Interval) + end.