diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 63a402daa..3d08f0289 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -242,7 +242,7 @@ blocked(cast, flush, Data) -> resume_from_blocked(Data); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); -blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> +blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> @@ -437,9 +437,25 @@ collect_and_enqueue_query_requests(Request0, Data0) -> end, Requests ), - {_Overflow, NewQ} = append_queue(Id, Index, Q, Queries), + {Overflown, NewQ} = append_queue(Id, Index, Q, Queries), + ok = reply_overflown(Overflown), Data0#{queue := NewQ}. +reply_overflown([]) -> + ok; +reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) -> + do_reply_caller(From, {error, buffer_overflow}), + reply_overflown(More). + +do_reply_caller(undefined, _Result) -> + ok; +do_reply_caller({F, Args}, Result) when is_function(F) -> + _ = erlang:apply(F, Args ++ [Result]), + ok; +do_reply_caller(From, Result) -> + _ = gen_statem:reply(From, Result), + ok. + maybe_flush(Data0) -> #{ batch_size := BatchSize, @@ -1082,18 +1098,19 @@ queue_item_marshaller(Item) -> estimate_size(QItem) -> erlang:external_size(QItem). --spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q(). +-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> + {[queue_query()], replayq:q()}. append_queue(Id, Index, Q, Queries) -> %% this assertion is to ensure that we never append a raw binary %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), - {Overflow, Q2} = + {Overflown, Q2} = case replayq:overflow(Q0) of - OverflowBytes when OverflowBytes =< 0 -> + OverflownBytes when OverflownBytes =< 0 -> {[], Q0}; - OverflowBytes -> - PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999}, + OverflownBytes -> + PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), @@ -1112,10 +1129,10 @@ append_queue(Id, Index, Q, Queries) -> id => Id, items => Queries, queue_count => queue_count(Q2), - overflow => length(Overflow) + overflown => length(Overflown) } ), - {Overflow, Q2}. + {Overflown, Q2}. %%============================================================================== %% the inflight queue for async query diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 9b2af74f6..34a92a5a2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1226,8 +1226,8 @@ t_always_overflow(_Config) -> Payload = binary:copy(<<"a">>, 100), %% since it's sync and it should never send a request, this %% errors with `timeout'. - ?assertError( - timeout, + ?assertEqual( + {error, buffer_overflow}, emqx_resource:query( ?ID, {big_payload, Payload},