From 2b4e49e7df5d286141713fa2d15a0cbc65f619b4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 24 Feb 2023 15:06:49 +0300 Subject: [PATCH] fix(bufworker): handle replies of simple async queries Before that change, simple queries were treated as "retries" essentially, thus skipping all the reply processing there is. --- .../src/emqx_resource_buffer_worker.erl | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 38de2dc34..a8ae4454d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -109,6 +109,7 @@ start_link(Id, Index, Opts) -> -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts0) -> + ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, sync), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -118,6 +119,7 @@ sync_query(Id, Request, Opts0) -> -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts0) -> + ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, async), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -133,6 +135,7 @@ simple_sync_query(Id, Request) -> %% call ends up calling buffering functions, that's a bug and %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. + ?tp(simple_sync_query, #{id => Id, request => Request}), Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), @@ -144,6 +147,7 @@ simple_sync_query(Id, Request) -> %% simple async-query the resource without batching and queuing. -spec simple_async_query(id(), request(), query_opts()) -> term(). simple_async_query(Id, Request, QueryOpts0) -> + ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}), Index = undefined, QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), @@ -877,7 +881,7 @@ handle_async_worker_down(Data0, Pid) -> {keep_state, Data}. call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> - ?tp(call_query_enter, #{id => Id, query => Query}), + ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -994,11 +998,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re handle_async_reply( #{ request_ref := Ref, - inflight_tid := InflightTID + inflight_tid := InflightTID, + query_opts := Opts } = ReplyContext, Result ) -> - case maybe_handle_unknown_async_reply(InflightTID, Ref) of + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> @@ -1068,11 +1073,12 @@ do_handle_async_reply( handle_async_batch_reply( #{ inflight_tid := InflightTID, - request_ref := Ref + request_ref := Ref, + query_opts := Opts } = ReplyContext, Result ) -> - case maybe_handle_unknown_async_reply(InflightTID, Ref) of + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> @@ -1206,7 +1212,9 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% 2. If the request was previously failed and now pending on a retry, %% then this function will return 'continue' as there is no way to %% tell if this reply is stae or not. -maybe_handle_unknown_async_reply(InflightTID, Ref) -> +maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) -> + continue; +maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) -> try ets:member(InflightTID, Ref) of true -> continue;