Merge pull request #9845 from zmstone/0126-reply-caller-for-buffer-overflow-queue-items

0126 reply caller for buffer overflow queue items
This commit is contained in:
Zaiming (Stone) Shi 2023-01-27 11:32:59 +01:00 committed by GitHub
commit 965236c888
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 46 deletions

View File

@ -424,7 +424,13 @@ check_config(SchemaMod, RawConf, Opts0) ->
%% it's maybe too much when reporting to the user
-spec compact_errors(any(), any()) -> no_return().
compact_errors(Schema, [Error0 | More]) when is_map(Error0) ->
Error1 = Error0#{discarded_errors_count => length(More)},
Error1 =
case length(More) of
0 ->
Error0;
_ ->
Error0#{unshown_errors => length(More)}
end,
Error =
case is_atom(Schema) of
true ->

View File

@ -188,8 +188,7 @@ t_create_invalid_config(_Config) ->
?assertMatch(
{error, #{
kind := validation_error,
path := "authorization.sources.1",
discarded_errors_count := 0
path := "authorization.sources.1"
}},
emqx_authz:update(?CMD_REPLACE, [C])
).

View File

@ -63,11 +63,7 @@
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
-define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
-define(EXPAND(RESULT, BATCH), [
?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH
]).
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
{Ref, BatchOrQuery, IsRetriable, WorkerMRef}
).
@ -242,8 +238,8 @@ blocked(cast, flush, Data) ->
resume_from_blocked(Data);
blocked(state_timeout, unblock, St) ->
resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
{_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) ->
Data = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data};
blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data;
@ -370,8 +366,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult =
case QueryOrBatch of
?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
?QUERY(From, _, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(From, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
@ -412,7 +408,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
gen_statem:event_handler_result(state(), data()).
handle_query_requests(Request0, Data0) ->
{_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
Data = collect_and_enqueue_query_requests(Request0, Data0),
maybe_flush(Data).
collect_and_enqueue_query_requests(Request0, Data0) ->
@ -437,9 +433,24 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
end,
Requests
),
NewQ = append_queue(Id, Index, Q, Queries),
Data = Data0#{queue := NewQ},
{Queries, Data}.
{Overflown, NewQ} = append_queue(Id, Index, Q, Queries),
ok = reply_overflown(Overflown),
Data0#{queue := NewQ}.
reply_overflown([]) ->
ok;
reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) ->
do_reply_caller(From, {error, buffer_overflow}),
reply_overflown(More).
do_reply_caller(undefined, _Result) ->
ok;
do_reply_caller({F, Args}, Result) when is_function(F) ->
_ = erlang:apply(F, Args ++ [Result]),
ok;
do_reply_caller(From, Result) ->
_ = gen_statem:reply(From, Result),
ok.
maybe_flush(Data0) ->
#{
@ -533,10 +544,10 @@ do_flush(
inflight_tid := InflightTID
} = Data0,
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch,
[?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
Reply = ?REPLY(From, HasBeenSent, Result),
case reply_caller(Id, Reply, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table.
@ -690,6 +701,14 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
ShouldBlock.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
Replies = lists:map(
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
?REPLY(FROM, SENT, BatchResult)
end,
Batch
),
{ShouldAck, PostFns} =
lists:foldl(
fun(Reply, {_ShouldAck, PostFns}) ->
@ -697,9 +716,7 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
{ShouldAck, [PostFn | PostFns]}
end,
{ack, []},
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
?EXPAND(BatchResult, Batch)
Replies
),
PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
{ShouldAck, PostFn}.
@ -711,9 +728,9 @@ reply_caller(Id, Reply, QueryOpts) ->
%% Should only reply to the caller when the decision is final (not
%% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) ->
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when
is_function(ReplyFun)
->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
@ -735,7 +752,7 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result),
ok
end,
{ShouldAck, PostFn};
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) ->
reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
@ -974,7 +991,7 @@ do_reply_after_query(
Index,
InflightTID,
Ref,
?QUERY(From, Request, HasBeenSent, _ExpireAt),
?QUERY(From, _Request, HasBeenSent, _ExpireAt),
QueryOpts,
Result
) ->
@ -982,14 +999,14 @@ do_reply_after_query(
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFn} = reply_caller_defer_metrics(
Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts
Id, ?REPLY(From, HasBeenSent, Result), QueryOpts
),
case Action of
nack ->
%% Keep retrying.
?tp(buffer_worker_reply_after_query, #{
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
ref => Ref,
result => Result
}),
@ -998,7 +1015,7 @@ do_reply_after_query(
ack ->
?tp(buffer_worker_reply_after_query, #{
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
ref => Ref,
result => Result
}),
@ -1083,23 +1100,29 @@ queue_item_marshaller(Item) ->
estimate_size(QItem) ->
erlang:external_size(QItem).
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q().
append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
%% we must not append a raw binary because the marshaller will get
%% lost.
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
{[queue_query()], replayq:q()}.
append_queue(Id, Index, Q, Queries) ->
%% this assertion is to ensure that we never append a raw binary
%% because the marshaller will get lost.
false = is_binary(hd(Queries)),
Q0 = replayq:append(Q, Queries),
Q2 =
{Overflown, Q2} =
case replayq:overflow(Q0) of
Overflow when Overflow =< 0 ->
Q0;
Overflow ->
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
OverflownBytes when OverflownBytes =< 0 ->
{[], Q0};
OverflownBytes ->
PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
emqx_resource_metrics:dropped_queue_full_inc(Id),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
?SLOG(info, #{
msg => buffer_worker_overflow,
worker_id => Id,
dropped => Dropped
}),
{Items2, Q1}
end,
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
?tp(
@ -1107,10 +1130,11 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
#{
id => Id,
items => Queries,
queue_count => queue_count(Q2)
queue_count => queue_count(Q2),
overflown => length(Overflown)
}
),
Q2.
{Overflown, Q2}.
%%==============================================================================
%% the inflight queue for async query

View File

@ -1226,8 +1226,8 @@ t_always_overflow(_Config) ->
Payload = binary:copy(<<"a">>, 100),
%% since it's sync and it should never send a request, this
%% errors with `timeout'.
?assertError(
timeout,
?assertEqual(
{error, buffer_overflow},
emqx_resource:query(
?ID,
{big_payload, Payload},

View File

@ -850,7 +850,6 @@ test_publish_success_batch(Config) ->
t_not_a_json(Config) ->
?assertMatch(
{error, #{
discarded_errors_count := 0,
kind := validation_error,
reason := #{exception := {error, {badmap, "not a json"}}},
%% should be censored as it contains secrets
@ -868,7 +867,6 @@ t_not_a_json(Config) ->
t_not_of_service_account_type(Config) ->
?assertMatch(
{error, #{
discarded_errors_count := 0,
kind := validation_error,
reason := {wrong_type, <<"not a service account">>},
%% should be censored as it contains secrets
@ -887,7 +885,6 @@ t_json_missing_fields(Config) ->
GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config),
?assertMatch(
{error, #{
discarded_errors_count := 0,
kind := validation_error,
reason :=
{missing_keys, [