diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a0040922e..fdc4e3104 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -210,9 +210,9 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> - do_resume(St); + resume_from_blocked(St); blocked(state_timeout, resume, St) -> - do_resume(St); + resume_from_blocked(St); blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) -> #{ id := Id, @@ -227,6 +227,7 @@ blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), HasBeenSent = false, _ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)), + %% TODO collect requests NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]), Data = Data0#{queue := NewQ}, {keep_state, Data}; @@ -291,14 +292,14 @@ pick_cast(Id, Key, Query) -> ok end). -do_resume(#{id := Id, inflight_tid := InflightTID} = Data) -> +resume_from_blocked(Data) -> case inflight_get_first(InflightTID) of empty -> retry_queue(Data); {Ref, FirstQuery} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. - retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data) + retry_inflight_sync(Ref, FirstQuery, Data) end. retry_queue( @@ -387,14 +388,10 @@ retry_queue( end end. -retry_inflight_sync( - Id, - Ref, - QueryOrBatch, - InflightTID, - #{index := Index, resume_interval := ResumeT} = Data0 -) -> - QueryOpts = #{}, +retry_inflight_sync(Ref, QueryOrBatch, Data0) -> + #{id := Id, inflight_tid := InflightTID, index := Index, resume_interval := ResumeT} = + Data0 + QueryOpts = #{}, %% if we are retrying an inflight query, it has been sent HasBeenSent = true, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), @@ -416,7 +413,7 @@ retry_inflight_sync( %% we bump the counter when removing it from the table. IsDropped andalso PostFn(), ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), - do_resume(Data0) + resume_from_blocked(Data0) end. %% Called during the `running' state only.