fix(buffer_worker): check for overflow after enqueuing new requests
This commit is contained in:
parent
4cb83d0c9a
commit
81fc561ed5
|
@ -871,23 +871,23 @@ estimate_size(QItem) ->
|
||||||
append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
||||||
%% we must not append a raw binary because the marshaller will get
|
%% we must not append a raw binary because the marshaller will get
|
||||||
%% lost.
|
%% lost.
|
||||||
|
Q0 = replayq:append(Q, Queries),
|
||||||
Q2 =
|
Q2 =
|
||||||
case replayq:overflow(Q) of
|
case replayq:overflow(Q0) of
|
||||||
Overflow when Overflow =< 0 ->
|
Overflow when Overflow =< 0 ->
|
||||||
Q;
|
Q0;
|
||||||
Overflow ->
|
Overflow ->
|
||||||
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
|
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
|
||||||
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
|
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
Dropped = length(Items2),
|
Dropped = length(Items2),
|
||||||
emqx_resource_metrics:dropped_queue_full_inc(Id),
|
emqx_resource_metrics:dropped_queue_full_inc(Id),
|
||||||
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
|
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
|
||||||
Q1
|
Q1
|
||||||
end,
|
end,
|
||||||
Q3 = replayq:append(Q2, Queries),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q3)),
|
|
||||||
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
||||||
Q3.
|
Q2.
|
||||||
|
|
||||||
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
|
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
|
||||||
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
|
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
|
||||||
|
|
|
@ -1219,6 +1219,44 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% check that, if we configure a max queue size too small, then we
|
||||||
|
%% never send requests and always overflow.
|
||||||
|
t_always_overflow(_Config) ->
|
||||||
|
{ok, _} = emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
#{
|
||||||
|
query_mode => sync,
|
||||||
|
batch_size => 1,
|
||||||
|
worker_pool_size => 1,
|
||||||
|
max_queue_bytes => 1,
|
||||||
|
resume_interval => 1_000
|
||||||
|
}
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
Payload = binary:copy(<<"a">>, 100),
|
||||||
|
%% since it's sync and it should never send a request, this
|
||||||
|
%% errors with `timeout'.
|
||||||
|
?assertError(
|
||||||
|
timeout,
|
||||||
|
emqx_resource:query(
|
||||||
|
?ID,
|
||||||
|
{big_payload, Payload},
|
||||||
|
#{timeout => 500}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertEqual([], ?of_kind(call_query_enter, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue