diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 4d3e5be5a..5edee241f 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -445,17 +445,17 @@ handle_query_requests(Request0, Data0) -> Data = Data0#{queue := NewQ}, maybe_flush(Data). -maybe_flush(Data) -> +maybe_flush(Data0) -> #{ batch_size := BatchSize, queue := Q - } = Data, + } = Data0, QueueCount = queue_count(Q), case QueueCount >= BatchSize of true -> - flush(Data); + flush(Data0); false -> - {keep_state, ensure_flush_timer(Data)} + {keep_state, ensure_flush_timer(Data0)} end. %% Called during the `running' state only. @@ -465,19 +465,19 @@ flush(Data0) -> batch_size := BatchSize, queue := Q0 } = Data0, + Data1 = cancel_flush_timer(Data0), case replayq:count(Q0) of 0 -> - Data = cancel_flush_timer(Data0), - {keep_state, Data}; + {keep_state, Data1}; _ -> {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), IsBatch = BatchSize =/= 1, %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. %% Maybe we could re-open the queue? - Data1 = Data0#{queue := Q1}, + Data2 = Data1#{queue := Q1}, Ref = make_message_ref(), - do_flush(Data1, #{ + do_flush(Data2, #{ new_queue => Q1, is_batch => IsBatch, batch => Batch, @@ -511,7 +511,6 @@ do_flush( [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), - Data1 = cancel_flush_timer(Data0), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), case reply_caller(Id, Reply) of %% Failed; remove the request from the queue, as we cannot pop @@ -525,7 +524,7 @@ do_flush( is_not_connected_result(Result), ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - {next_state, blocked, Data1}; + {next_state, blocked, Data0}; %% Success; just ack. false -> ok = replayq:ack(Q1, QAckRef), @@ -533,9 +532,9 @@ do_flush( emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), case replayq:count(Q1) > 0 of true -> - {keep_state, Data1, [{next_event, internal, flush}]}; + {keep_state, Data0, [{next_event, internal, flush}]}; false -> - {keep_state, Data1} + {keep_state, Data0} end end; do_flush(Data0, #{ @@ -553,7 +552,6 @@ do_flush(Data0, #{ } = Data0, QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), - Data1 = cancel_flush_timer(Data0), case batch_reply_caller(Id, Result, Batch) of %% Failed; remove the request from the queue, as we cannot pop %% from it again. But we must ensure it's in the inflight @@ -566,7 +564,7 @@ do_flush(Data0, #{ is_not_connected_result(Result), ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - {next_state, blocked, Data1}; + {next_state, blocked, Data0}; %% Success; just ack. false -> ok = replayq:ack(Q1, QAckRef), @@ -575,12 +573,12 @@ do_flush(Data0, #{ CurrentCount = replayq:count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> - {keep_state, Data1}; + {keep_state, Data0}; {true, true} -> - {keep_state, Data1, [{next_event, internal, flush}]}; + {keep_state, Data0, [{next_event, internal, flush}]}; {true, false} -> - Data2 = ensure_flush_timer(Data1), - {keep_state, Data2} + Data1 = ensure_flush_timer(Data0), + {keep_state, Data1} end end.