From f6b3b930b0716124aa514bd3ab4859db762eb3f4 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 19 Jan 2023 11:54:56 +0100 Subject: [PATCH 1/6] chore: improve a error log --- apps/emqx/src/emqx_config.erl | 8 +++++++- apps/emqx_authz/test/emqx_authz_redis_SUITE.erl | 3 +-- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 3 --- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index ba4095daa..204e32a2b 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -424,7 +424,13 @@ check_config(SchemaMod, RawConf, Opts0) -> %% it's maybe too much when reporting to the user -spec compact_errors(any(), any()) -> no_return(). compact_errors(Schema, [Error0 | More]) when is_map(Error0) -> - Error1 = Error0#{discarded_errors_count => length(More)}, + Error1 = + case length(More) of + 0 -> + Error0; + _ -> + Error0#{unshown_errors => length(More)} + end, Error = case is_atom(Schema) of true -> diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 15b180c96..c07d920ad 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -188,8 +188,7 @@ t_create_invalid_config(_Config) -> ?assertMatch( {error, #{ kind := validation_error, - path := "authorization.sources.1", - discarded_errors_count := 0 + path := "authorization.sources.1" }}, emqx_authz:update(?CMD_REPLACE, [C]) ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 247b7799b..222acb77b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -850,7 +850,6 @@ test_publish_success_batch(Config) -> t_not_a_json(Config) -> ?assertMatch( {error, #{ - discarded_errors_count := 0, kind := validation_error, reason := #{exception := {error, {badmap, "not a json"}}}, %% should be censored as it contains secrets @@ -868,7 +867,6 @@ t_not_a_json(Config) -> t_not_of_service_account_type(Config) -> ?assertMatch( {error, #{ - discarded_errors_count := 0, kind := validation_error, reason := {wrong_type, <<"not a service account">>}, %% should be censored as it contains secrets @@ -887,7 +885,6 @@ t_json_missing_fields(Config) -> GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), ?assertMatch( {error, #{ - discarded_errors_count := 0, kind := validation_error, reason := {missing_keys, [ From bb26632c8a3d808a554302735dd77cd4d135fadb Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 14:33:16 +0100 Subject: [PATCH 2/6] fix(buffer_worker): fix a wrong assertion the assertion is to ensure queue items are not binary but should not assert the queue itself --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 11d3753f0..eaae64dd8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1084,9 +1084,10 @@ estimate_size(QItem) -> erlang:external_size(QItem). -spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q(). -append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> - %% we must not append a raw binary because the marshaller will get - %% lost. +append_queue(Id, Index, Q, Queries) -> + %% this assertion is to ensure that we never append a raw binary + %% because the marshaller will get lost. + false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), Q2 = case replayq:overflow(Q0) of From 25b4821adc53b885b0a8641799b2e37834c1de1a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 14:40:41 +0100 Subject: [PATCH 3/6] refactor: move the the per-message overflow log from error to info level --- .../src/emqx_resource_buffer_worker.erl | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index eaae64dd8..28e2c785d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -437,7 +437,7 @@ collect_and_enqueue_query_requests(Request0, Data0) -> end, Requests ), - NewQ = append_queue(Id, Index, Q, Queries), + {_Overflow, NewQ} = append_queue(Id, Index, Q, Queries), Data = Data0#{queue := NewQ}, {Queries, Data}. @@ -1089,18 +1089,22 @@ append_queue(Id, Index, Q, Queries) -> %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), - Q2 = + {Overflow, Q2} = case replayq:overflow(Q0) of - Overflow when Overflow =< 0 -> - Q0; - Overflow -> - PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + OverflowBytes when OverflowBytes =< 0 -> + {[], Q0}; + OverflowBytes -> + PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), emqx_resource_metrics:dropped_queue_full_inc(Id), - ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), - Q1 + ?SLOG(info, #{ + msg => buffer_worker_overflow, + worker_id => Id, + dropped => Dropped + }), + {Items2, Q1} end, emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp( @@ -1108,10 +1112,11 @@ append_queue(Id, Index, Q, Queries) -> #{ id => Id, items => Queries, - queue_count => queue_count(Q2) + queue_count => queue_count(Q2), + overflow => length(Overflow) } ), - Q2. + {Overflow, Q2}. %%============================================================================== %% the inflight queue for async query From ed2878916447dca24f03a022ba50014cc70b2f2a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 14:50:40 +0100 Subject: [PATCH 4/6] refactor(buffer_worker): no need to return after collect into buf queue --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 28e2c785d..63a402daa 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -243,7 +243,7 @@ blocked(cast, flush, Data) -> blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> - {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), + Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> keep_state_and_data; @@ -412,7 +412,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> gen_statem:event_handler_result(state(), data()). handle_query_requests(Request0, Data0) -> - {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), + Data = collect_and_enqueue_query_requests(Request0, Data0), maybe_flush(Data). collect_and_enqueue_query_requests(Request0, Data0) -> @@ -438,8 +438,7 @@ collect_and_enqueue_query_requests(Request0, Data0) -> Requests ), {_Overflow, NewQ} = append_queue(Id, Index, Q, Queries), - Data = Data0#{queue := NewQ}, - {Queries, Data}. + Data0#{queue := NewQ}. maybe_flush(Data0) -> #{ From 1f799dfd59e21b8901cabd31ddaa7570eb09fc98 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 16:56:32 +0100 Subject: [PATCH 5/6] fix: reply with {error, buffer_overflow} when discarded --- .../src/emqx_resource_buffer_worker.erl | 35 ++++++++++++++----- .../test/emqx_resource_SUITE.erl | 4 +-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 63a402daa..3d08f0289 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -242,7 +242,7 @@ blocked(cast, flush, Data) -> resume_from_blocked(Data); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); -blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> +blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> @@ -437,9 +437,25 @@ collect_and_enqueue_query_requests(Request0, Data0) -> end, Requests ), - {_Overflow, NewQ} = append_queue(Id, Index, Q, Queries), + {Overflown, NewQ} = append_queue(Id, Index, Q, Queries), + ok = reply_overflown(Overflown), Data0#{queue := NewQ}. +reply_overflown([]) -> + ok; +reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) -> + do_reply_caller(From, {error, buffer_overflow}), + reply_overflown(More). + +do_reply_caller(undefined, _Result) -> + ok; +do_reply_caller({F, Args}, Result) when is_function(F) -> + _ = erlang:apply(F, Args ++ [Result]), + ok; +do_reply_caller(From, Result) -> + _ = gen_statem:reply(From, Result), + ok. + maybe_flush(Data0) -> #{ batch_size := BatchSize, @@ -1082,18 +1098,19 @@ queue_item_marshaller(Item) -> estimate_size(QItem) -> erlang:external_size(QItem). --spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q(). +-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> + {[queue_query()], replayq:q()}. append_queue(Id, Index, Q, Queries) -> %% this assertion is to ensure that we never append a raw binary %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), - {Overflow, Q2} = + {Overflown, Q2} = case replayq:overflow(Q0) of - OverflowBytes when OverflowBytes =< 0 -> + OverflownBytes when OverflownBytes =< 0 -> {[], Q0}; - OverflowBytes -> - PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999}, + OverflownBytes -> + PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), @@ -1112,10 +1129,10 @@ append_queue(Id, Index, Q, Queries) -> id => Id, items => Queries, queue_count => queue_count(Q2), - overflow => length(Overflow) + overflown => length(Overflown) } ), - {Overflow, Q2}. + {Overflown, Q2}. %%============================================================================== %% the inflight queue for async query diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 9b2af74f6..34a92a5a2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1226,8 +1226,8 @@ t_always_overflow(_Config) -> Payload = binary:copy(<<"a">>, 100), %% since it's sync and it should never send a request, this %% errors with `timeout'. - ?assertError( - timeout, + ?assertEqual( + {error, buffer_overflow}, emqx_resource:query( ?ID, {big_payload, Payload}, From d4fab92b72237b850b4b7e231eca9e295c48a2b6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 18:00:20 +0100 Subject: [PATCH 6/6] refactor(buffer_worker): no need to keep request for REPLY macro --- .../src/emqx_resource_buffer_worker.erl | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 3d08f0289..5460a8198 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -63,11 +63,7 @@ -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). --define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). --define(EXPAND(RESULT, BATCH), [ - ?REPLY(FROM, REQUEST, SENT, RESULT) - || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH -]). +-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), {Ref, BatchOrQuery, IsRetriable, WorkerMRef} ). @@ -370,8 +366,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of - ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) -> - Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + ?QUERY(From, _, HasBeenSent, _ExpireAt) -> + Reply = ?REPLY(From, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) @@ -548,10 +544,10 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch, + [?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), - Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + Reply = ?REPLY(From, HasBeenSent, Result), case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. @@ -705,6 +701,14 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> ShouldBlock. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> + %% the `Mod:on_batch_query/3` returns a single result for a batch, + %% so we need to expand + Replies = lists:map( + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> + ?REPLY(FROM, SENT, BatchResult) + end, + Batch + ), {ShouldAck, PostFns} = lists:foldl( fun(Reply, {_ShouldAck, PostFns}) -> @@ -712,9 +716,7 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> {ShouldAck, [PostFn | PostFns]} end, {ack, []}, - %% the `Mod:on_batch_query/3` returns a single result for a batch, - %% so we need to expand - ?EXPAND(BatchResult, Batch) + Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, {ShouldAck, PostFn}. @@ -726,9 +728,9 @@ reply_caller(Id, Reply, QueryOpts) -> %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. -reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) -> +reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> handle_query_result_pure(Id, Result, HasBeenSent); -reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when +reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when is_function(ReplyFun) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), @@ -750,7 +752,7 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), ok end, {ShouldAck, PostFn}; -reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) -> +reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), @@ -989,7 +991,7 @@ do_reply_after_query( Index, InflightTID, Ref, - ?QUERY(From, Request, HasBeenSent, _ExpireAt), + ?QUERY(From, _Request, HasBeenSent, _ExpireAt), QueryOpts, Result ) -> @@ -997,14 +999,14 @@ do_reply_after_query( %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics( - Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts + Id, ?REPLY(From, HasBeenSent, Result), QueryOpts ), case Action of nack -> %% Keep retrying. ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -1013,7 +1015,7 @@ do_reply_after_query( ack -> ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }),