diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a0facbed3..eb4921c72 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -871,23 +871,23 @@ estimate_size(QItem) -> append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> %% we must not append a raw binary because the marshaller will get %% lost. + Q0 = replayq:append(Q, Queries), Q2 = - case replayq:overflow(Q) of + case replayq:overflow(Q0) of Overflow when Overflow =< 0 -> - Q; + Q0; Overflow -> PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, - {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), + {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - Q3 = replayq:append(Q2, Queries), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q3)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), - Q3. + Q2. -spec get_first_n_from_queue(replayq:q(), pos_integer()) -> empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index a2767ee6c..047245c8b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1219,6 +1219,44 @@ t_delete_and_re_create_with_same_name(_Config) -> ), ok. +%% check that, if we configure a max queue size too small, then we +%% never send requests and always overflow. +t_always_overflow(_Config) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 1, + max_queue_bytes => 1, + resume_interval => 1_000 + } + ), + ?check_trace( + begin + Payload = binary:copy(<<"a">>, 100), + %% since it's sync and it should never send a request, this + %% errors with `timeout'. + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {big_payload, Payload}, + #{timeout => 500} + ) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(call_query_enter, Trace)), + ok + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------