diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 59fd41a03..6875341b4 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -33,7 +33,8 @@ sync_query/3, async_query/3, block/1, - resume/1 + resume/1, + flush_worker/1 ]). -export([ @@ -152,6 +153,10 @@ block(ServerRef) -> resume(ServerRef) -> gen_statem:cast(ServerRef, resume). +-spec flush_worker(pid()) -> ok. +flush_worker(ServerRef) -> + gen_statem:cast(ServerRef, flush). + -spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()). init({Id, Index, Opts}) -> process_flag(trap_exit, true), @@ -196,6 +201,8 @@ running(enter, _, St) -> maybe_flush(St); running(cast, resume, _St) -> keep_state_and_data; +running(cast, flush, Data) -> + flush(Data); running(cast, block, St) -> {next_state, blocked, St}; running(info, ?SEND_REQ(_From, _Req) = Request0, Data) -> @@ -222,6 +229,8 @@ blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> resume_from_blocked(St); +blocked(cast, flush, Data) -> + resume_from_blocked(Data); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> @@ -834,6 +843,7 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee %% returned, otherwise the request will get retried. The %% caller has just been notified of the failure and should %% decide if it wants to retry or not. + IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso PostFn(), case Action of @@ -850,6 +860,7 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee batch_or_query => ?QUERY(From, Request, HasBeenSent), result => Result }), + IsFullBefore andalso ?MODULE:flush_worker(Pid), ok end. @@ -862,6 +873,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> %% returned, otherwise the request will get retried. The %% caller has just been notified of the failure and should %% decide if it wants to retry or not. + IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), case Action of @@ -874,6 +886,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> ?tp(resource_worker_reply_after_query, #{ action => ack, batch_or_query => Batch, result => Result }), + IsFullBefore andalso ?MODULE:flush_worker(Pid), ok end. @@ -942,6 +955,8 @@ inflight_get_first_retriable(InflightTID) -> {Ref, BatchOrQuery} end. +is_inflight_full(undefined) -> + false; is_inflight_full(InflightTID) -> [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages