diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index deca3a5cd..7ac6c5250 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -298,99 +298,13 @@ resume_from_blocked(Data) -> #{inflight_tid := InflightTID} = Data, case inflight_get_first(InflightTID) of empty -> - retry_queue(Data); + {next_state, running, Data}; {Ref, FirstQuery} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, FirstQuery, Data) end. -retry_queue( - #{ - queue := Q0, - id := Id, - index := Index, - batch_size := 1, - inflight_tid := InflightTID, - resume_interval := ResumeT - } = Data0 -) -> - %% no batching - case get_first_n_from_queue(Q0, 1) of - empty -> - {next_state, running, Data0}; - {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> - Data = Data0#{queue := Q1}, - QueryOpts = #{inflight_name => InflightTID}, - Ref = make_message_ref(), - Result = call_query(configured, Id, Index, Ref, Query, QueryOpts), - Reply = ?REPLY(undefined, Request, HasBeenSent, Result), - case reply_caller(Id, Reply) of - true -> - %% Still failed, but now it's in the inflight - %% table and marked as sent, except if the result - %% says inflight is full. In this case, we must - %% ensure it's indeed in the inflight table or - %% risk lose it. - ok = replayq:ack(Q1, QAckRef), - is_inflight_full_result(Result) andalso - inflight_append(InflightTID, Ref, Query, Id, Index), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - {keep_state, Data, {state_timeout, ResumeT, resume}}; - false -> - ok = replayq:ack(Q1, QAckRef), - is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - retry_queue(Data) - end - end; -retry_queue( - #{ - queue := Q, - id := Id, - index := Index, - batch_size := BatchSize, - inflight_tid := InflightTID, - resume_interval := ResumeT - } = Data0 -) -> - %% batching - case get_first_n_from_queue(Q, BatchSize) of - empty -> - {next_state, running, Data0}; - {Q1, QAckRef, Batch0} -> - Data = Data0#{queue := Q1}, - QueryOpts = #{inflight_name => InflightTID}, - Ref = make_message_ref(), - Result = call_query(configured, Id, Index, Ref, Batch0, QueryOpts), - %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, - %% we now change the 'from' field to 'undefined' so it will not reply the caller again. - Batch = [ - ?QUERY(undefined, Request, HasBeenSent0) - || ?QUERY(_, Request, HasBeenSent0) <- Batch0 - ], - case batch_reply_caller(Id, Result, Batch) of - true -> - ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}), - %% Still failed, but now it's in the inflight - %% table and marked as sent, except if the result - %% says inflight is full. In this case, we must - %% ensure it's indeed in the inflight table or - %% risk lose it. - ok = replayq:ack(Q1, QAckRef), - is_inflight_full_result(Result) andalso - inflight_append(InflightTID, Ref, Batch, Id, Index), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - {keep_state, Data, {state_timeout, ResumeT, resume}}; - false -> - ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}), - ok = replayq:ack(Q1, QAckRef), - is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - retry_queue(Data) - end - end. - retry_inflight_sync(Ref, QueryOrBatch, Data0) -> #{ id := Id, @@ -891,16 +805,6 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), Q2. --spec get_first_n_from_queue(replayq:q(), pos_integer()) -> - empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}. -get_first_n_from_queue(Q, N) -> - case queue_count(Q) of - 0 -> - empty; - _ -> - replayq:pop(Q, #{count_limit => N}) - end. - %%============================================================================== %% the inflight queue for async query -define(MAX_SIZE_REF, -1).