From 7a6465e2cfdf8fc0892938c9a9266c1676b3165b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 21:00:39 +0100 Subject: [PATCH] fix(buffer_worker): ensure flush timer reset in blocked state --- .../src/emqx_resource_buffer_worker.erl | 20 +++++++++++++------ .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index e6fa1c537..aaa07cf9a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -236,21 +236,24 @@ running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), keep_state_and_data. -blocked(enter, _, #{resume_interval := ResumeT} = _St) -> +blocked(enter, _, #{resume_interval := ResumeT} = St0) -> ?tp(buffer_worker_enter_blocked, #{}), - {keep_state_and_data, {state_timeout, ResumeT, unblock}}; + %% discard the old timer, new timer will be started when entering running state again + St = cancel_flush_timer(St0), + {keep_state, St, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> resume_from_blocked(St); -blocked(cast, flush, Data) -> - resume_from_blocked(Data); +blocked(cast, flush, St) -> + resume_from_blocked(St); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> + %% ignore stale timer keep_state_and_data; blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -622,6 +625,9 @@ do_flush( }), flush_worker(self()); false -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_num_batches(InflightTID) + }), ok end, {keep_state, Data1} @@ -700,6 +706,9 @@ do_flush(#{queue := Q1} = Data0, #{ Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_num_batches(InflightTID) + }), Data1; {true, true} -> ?tp(buffer_worker_flush_ack_reflush, #{ @@ -1003,7 +1012,6 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, worker_index := Index, - buffer_worker := Pid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1100,7 +1108,7 @@ handle_async_batch_reply1( end. handle_async_batch_reply2([], _, _, _) -> - %% should have caused the unknown_async_reply_discarded + %% this usually should never happen unless the async callback is being evaluated concurrently ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index dfe64de24..e22ca7750 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1543,7 +1543,7 @@ t_async_reply_multi_eval(_Config) -> end, #{} ), - #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0}, + #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, TotalTime ), ok