fix: reply with {error, buffer_overflow} when discarded
This commit is contained in:
parent
ed28789164
commit
1f799dfd59
|
@ -242,7 +242,7 @@ blocked(cast, flush, Data) ->
|
||||||
resume_from_blocked(Data);
|
resume_from_blocked(Data);
|
||||||
blocked(state_timeout, unblock, St) ->
|
blocked(state_timeout, unblock, St) ->
|
||||||
resume_from_blocked(St);
|
resume_from_blocked(St);
|
||||||
blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
|
blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) ->
|
||||||
Data = collect_and_enqueue_query_requests(Request0, Data0),
|
Data = collect_and_enqueue_query_requests(Request0, Data0),
|
||||||
{keep_state, Data};
|
{keep_state, Data};
|
||||||
blocked(info, {flush, _Ref}, _Data) ->
|
blocked(info, {flush, _Ref}, _Data) ->
|
||||||
|
@ -437,9 +437,25 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
|
||||||
end,
|
end,
|
||||||
Requests
|
Requests
|
||||||
),
|
),
|
||||||
{_Overflow, NewQ} = append_queue(Id, Index, Q, Queries),
|
{Overflown, NewQ} = append_queue(Id, Index, Q, Queries),
|
||||||
|
ok = reply_overflown(Overflown),
|
||||||
Data0#{queue := NewQ}.
|
Data0#{queue := NewQ}.
|
||||||
|
|
||||||
|
reply_overflown([]) ->
|
||||||
|
ok;
|
||||||
|
reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) ->
|
||||||
|
do_reply_caller(From, {error, buffer_overflow}),
|
||||||
|
reply_overflown(More).
|
||||||
|
|
||||||
|
do_reply_caller(undefined, _Result) ->
|
||||||
|
ok;
|
||||||
|
do_reply_caller({F, Args}, Result) when is_function(F) ->
|
||||||
|
_ = erlang:apply(F, Args ++ [Result]),
|
||||||
|
ok;
|
||||||
|
do_reply_caller(From, Result) ->
|
||||||
|
_ = gen_statem:reply(From, Result),
|
||||||
|
ok.
|
||||||
|
|
||||||
maybe_flush(Data0) ->
|
maybe_flush(Data0) ->
|
||||||
#{
|
#{
|
||||||
batch_size := BatchSize,
|
batch_size := BatchSize,
|
||||||
|
@ -1082,18 +1098,19 @@ queue_item_marshaller(Item) ->
|
||||||
estimate_size(QItem) ->
|
estimate_size(QItem) ->
|
||||||
erlang:external_size(QItem).
|
erlang:external_size(QItem).
|
||||||
|
|
||||||
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q().
|
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
|
||||||
|
{[queue_query()], replayq:q()}.
|
||||||
append_queue(Id, Index, Q, Queries) ->
|
append_queue(Id, Index, Q, Queries) ->
|
||||||
%% this assertion is to ensure that we never append a raw binary
|
%% this assertion is to ensure that we never append a raw binary
|
||||||
%% because the marshaller will get lost.
|
%% because the marshaller will get lost.
|
||||||
false = is_binary(hd(Queries)),
|
false = is_binary(hd(Queries)),
|
||||||
Q0 = replayq:append(Q, Queries),
|
Q0 = replayq:append(Q, Queries),
|
||||||
{Overflow, Q2} =
|
{Overflown, Q2} =
|
||||||
case replayq:overflow(Q0) of
|
case replayq:overflow(Q0) of
|
||||||
OverflowBytes when OverflowBytes =< 0 ->
|
OverflownBytes when OverflownBytes =< 0 ->
|
||||||
{[], Q0};
|
{[], Q0};
|
||||||
OverflowBytes ->
|
OverflownBytes ->
|
||||||
PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999},
|
PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
|
||||||
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
|
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
Dropped = length(Items2),
|
Dropped = length(Items2),
|
||||||
|
@ -1112,10 +1129,10 @@ append_queue(Id, Index, Q, Queries) ->
|
||||||
id => Id,
|
id => Id,
|
||||||
items => Queries,
|
items => Queries,
|
||||||
queue_count => queue_count(Q2),
|
queue_count => queue_count(Q2),
|
||||||
overflow => length(Overflow)
|
overflown => length(Overflown)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{Overflow, Q2}.
|
{Overflown, Q2}.
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
%% the inflight queue for async query
|
%% the inflight queue for async query
|
||||||
|
|
|
@ -1226,8 +1226,8 @@ t_always_overflow(_Config) ->
|
||||||
Payload = binary:copy(<<"a">>, 100),
|
Payload = binary:copy(<<"a">>, 100),
|
||||||
%% since it's sync and it should never send a request, this
|
%% since it's sync and it should never send a request, this
|
||||||
%% errors with `timeout'.
|
%% errors with `timeout'.
|
||||||
?assertError(
|
?assertEqual(
|
||||||
timeout,
|
{error, buffer_overflow},
|
||||||
emqx_resource:query(
|
emqx_resource:query(
|
||||||
?ID,
|
?ID,
|
||||||
{big_payload, Payload},
|
{big_payload, Payload},
|
||||||
|
|
Loading…
Reference in New Issue