diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 24eb86d37..bc1aea734 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -585,7 +585,7 @@ flush(Data0) -> ?tp(buffer_worker_flush_before_pop, #{}), PopOpts = #{ count_limit => BatchSize, - stop_before => {fun stop_before_mixed_stop_after_render/2, initial_state} + stop_before => {fun stop_batching/2, initial_state} }, {Q1, QAckRef, Batch} = replayq:pop(Q0, PopOpts), Data2 = Data1#{queue := Q1}, @@ -623,72 +623,22 @@ flush(Data0) -> end end. -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - #{stop_action_after_render := true} = _TraceCtx - ), - initial_state -) -> +stop_batching(Query, initial_state) -> + get_stop_flag(Query); +stop_batching(Query, PrevStopFlag) -> + case get_stop_flag(Query) =:= PrevStopFlag of + true -> + PrevStopFlag; + false -> + %% We stop beceause we don't want a batch with mixed values for the + %% stop_action_after_render option + true + end. + +get_stop_flag(?QUERY(_, _, _, _, #{stop_action_after_render := true})) -> stop_action_after_render; -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - _TraceCtx - ), - initial_state -) -> - no_stop_action_after_render; -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - #{stop_action_after_render := true} = _TraceCtx - ), - no_stop_action_after_render -) -> - true; -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - #{stop_action_after_render := true} = _TraceCtx - ), - stop_action_after_render -) -> - stop_action_after_render; -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - _TraceCtx - ), - stop_action_after_render -) -> - true; -stop_before_mixed_stop_after_render( - ?QUERY( - _, - _, - _, - _, - _TraceCtx - ), - State -) -> - State. +get_stop_flag(_) -> + no_stop_action_after_render. -spec do_flush(data(), #{ is_batch := boolean(),