From 47f796dd12c6c79424ac3a8c5eb03ae95bdac27f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 18 Jan 2023 15:07:38 -0300 Subject: [PATCH 1/4] refactor: rename `emqx_resource_worker` -> `emqx_resource_buffer_worker` To make it more clear that it's purpose is serve as a buffering layer. --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- .../test/emqx_bridge_mqtt_SUITE.erl | 4 +- apps/emqx_resource/src/emqx_resource.erl | 10 ++-- ...er.erl => emqx_resource_buffer_worker.erl} | 42 ++++++------- ...rl => emqx_resource_buffer_worker_sup.erl} | 8 +-- .../src/emqx_resource_manager.erl | 8 +-- apps/emqx_resource/src/emqx_resource_sup.erl | 4 +- .../test/emqx_resource_SUITE.erl | 60 +++++++++---------- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 2 +- .../test/emqx_ee_bridge_mysql_SUITE.erl | 2 +- .../test/emqx_ee_bridge_pgsql_SUITE.erl | 2 +- 11 files changed, 72 insertions(+), 72 deletions(-) rename apps/emqx_resource/src/{emqx_resource_worker.erl => emqx_resource_buffer_worker.erl} (97%) rename apps/emqx_resource/src/{emqx_resource_worker_sup.erl => emqx_resource_buffer_worker_sup.erl} (94%) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 59b41c7fa..5b3fe796b 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -65,7 +65,7 @@ load() -> fun({Type, NamedConf}) -> lists:foreach( fun({Name, Conf}) -> - %% fetch opts for `emqx_resource_worker` + %% fetch opts for `emqx_resource_buffer_worker` ResOpts = emqx_resource:fetch_creation_opts(Conf), safe_load_bridge(Type, Name, Conf, ResOpts) end, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index d20d3bc10..1b4eac73e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -886,9 +886,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> {ok, SRef} = snabbkaffe:subscribe( fun - (#{?snk_kind := resource_worker_retry_inflight_failed}) -> + (#{?snk_kind := buffer_worker_retry_inflight_failed}) -> true; - (#{?snk_kind := resource_worker_flush_nack}) -> + (#{?snk_kind := buffer_worker_flush_nack}) -> true; (_) -> false diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0fd21bfcd..bb27b6acd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -255,7 +255,7 @@ reset_metrics(ResId) -> query(ResId, Request) -> query(ResId, Request, #{}). --spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> +-spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of @@ -263,11 +263,11 @@ query(ResId, Request, Opts) -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> - emqx_resource_worker:simple_sync_query(ResId, Request); + emqx_resource_buffer_worker:simple_sync_query(ResId, Request); {false, sync} -> - emqx_resource_worker:sync_query(ResId, Request, Opts); + emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> - emqx_resource_worker:async_query(ResId, Request, Opts) + emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -275,7 +275,7 @@ query(ResId, Request, Opts) -> -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> - emqx_resource_worker:simple_sync_query(ResId, Request). + emqx_resource_buffer_worker:simple_sync_query(ResId, Request). -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl similarity index 97% rename from apps/emqx_resource/src/emqx_resource_worker.erl rename to apps/emqx_resource/src/emqx_resource_buffer_worker.erl index d7010c3bd..4b16ed3d5 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -17,7 +17,7 @@ %% This module implements async message sending, disk message queuing, %% and message batching using ReplayQ. --module(emqx_resource_worker). +-module(emqx_resource_buffer_worker). -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). @@ -176,11 +176,11 @@ init({Id, Index, Opts}) -> resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined }, - ?tp(resource_worker_init, #{id => Id, index => Index}), + ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. running(enter, _, St) -> - ?tp(resource_worker_enter_running, #{}), + ?tp(buffer_worker_enter_running, #{}), maybe_flush(St); running(cast, resume, _St) -> keep_state_and_data; @@ -206,7 +206,7 @@ running(info, Info, _St) -> keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = _St) -> - ?tp(resource_worker_enter_blocked, #{}), + ?tp(buffer_worker_enter_blocked, #{}), {keep_state_and_data, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; @@ -315,7 +315,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> index := Index, resume_interval := ResumeT } = Data0, - ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), + ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = @@ -331,7 +331,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {nack, PostFn} -> PostFn(), ?tp( - resource_worker_retry_inflight_failed, + buffer_worker_retry_inflight_failed, #{ ref => Ref, query_or_batch => QueryOrBatch @@ -349,7 +349,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% we bump the counter when removing it from the table. IsAcked andalso PostFn(), ?tp( - resource_worker_retry_inflight_succeeded, + buffer_worker_retry_inflight_succeeded, #{ ref => Ref, query_or_batch => QueryOrBatch @@ -415,7 +415,7 @@ flush(Data0) -> {0, _} -> {keep_state, Data1}; {_, true} -> - ?tp(resource_worker_flush_but_inflight_full, #{}), + ?tp(buffer_worker_flush_but_inflight_full, #{}), Data2 = ensure_flush_timer(Data1), {keep_state, Data2}; {_, false} -> @@ -483,7 +483,7 @@ do_flush( store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_nack, + buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, @@ -512,7 +512,7 @@ do_flush( store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_ack, + buffer_worker_flush_ack, #{ batch_or_query => Request, result => Result @@ -560,7 +560,7 @@ do_flush(Data0, #{ store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_nack, + buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, @@ -589,7 +589,7 @@ do_flush(Data0, #{ store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_ack, + buffer_worker_flush_ack, #{ batch_or_query => Batch, result => Result @@ -873,7 +873,7 @@ reply_after_query( case Action of nack -> %% Keep retrying. - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), ref => Ref, @@ -882,7 +882,7 @@ reply_after_query( mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), ref => Ref, @@ -903,7 +903,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu case Action of nack -> %% Keep retrying. - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => nack, batch_or_query => Batch, ref => Ref, @@ -912,7 +912,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => ack, batch_or_query => Batch, ref => Ref, @@ -955,7 +955,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> end, emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp( - resource_worker_appended_to_queue, + buffer_worker_appended_to_queue, #{ id => Id, items => Queries, @@ -973,7 +973,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( - emqx_resource_worker_inflight_tab, + emqx_resource_buffer_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), @@ -1040,7 +1040,7 @@ inflight_append( BatchSize = length(Batch), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), + ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append( InflightTID, @@ -1053,7 +1053,7 @@ inflight_append( IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), + ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> ets:insert(InflightTID, {Ref, Data}), @@ -1130,7 +1130,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), - ?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}), + ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}), ok. %%============================================================================== diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl similarity index 94% rename from apps/emqx_resource/src/emqx_resource_worker_sup.erl rename to apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 8b0ce2c65..4987946c9 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_resource_worker_sup). +-module(emqx_resource_buffer_worker_sup). -behaviour(supervisor). %%%============================================================================= @@ -99,7 +99,7 @@ ensure_worker_added(ResId, Idx) -> -define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). ensure_worker_started(ResId, Idx, Opts) -> - Mod = emqx_resource_worker, + Mod = emqx_resource_buffer_worker, Spec = #{ id => ?CHILD_ID(Mod, ResId, Idx), start => {Mod, start_link, [ResId, Idx, Opts]}, @@ -116,7 +116,7 @@ ensure_worker_started(ResId, Idx, Opts) -> end. ensure_worker_removed(ResId, Idx) -> - ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), + ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx), case supervisor:terminate_child(?SERVER, ChildId) of ok -> Res = supervisor:delete_child(?SERVER, ChildId), @@ -129,7 +129,7 @@ ensure_worker_removed(ResId, Idx) -> end. ensure_disk_queue_dir_absent(ResourceId, Index) -> - ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index), + ok = emqx_resource_buffer_worker:clear_disk_queue_dir(ResourceId, Index), ok. ensure_worker_pool_removed(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index ab726976a..95d1ed1d2 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -150,7 +150,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> %% buffer, so there is no need for resource workers ok; false -> - ok = emqx_resource_worker_sup:start_workers(ResId, Opts), + ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); @@ -473,7 +473,7 @@ retry_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> stop_resource(Data), - ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), + ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok @@ -587,9 +587,9 @@ maybe_alarm(_Status, ResId) -> maybe_resume_resource_workers(connected) -> lists:foreach( fun({_, Pid, _, _}) -> - emqx_resource_worker:resume(Pid) + emqx_resource_buffer_worker:resume(Pid) end, - supervisor:which_children(emqx_resource_worker_sup) + supervisor:which_children(emqx_resource_buffer_worker_sup) ); maybe_resume_resource_workers(_) -> ok. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index ea31b8b6b..4d9abb03d 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -39,8 +39,8 @@ init([]) -> modules => [emqx_resource_manager_sup] }, WorkerSup = #{ - id => emqx_resource_worker_sup, - start => {emqx_resource_worker_sup, start_link, []}, + id => emqx_resource_buffer_worker_sup, + start => {emqx_resource_buffer_worker_sup, start_link, []}, restart => permanent, shutdown => infinity, type => supervisor diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 97bc8da66..c9325af2b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -414,7 +414,7 @@ t_query_counter_async_inflight(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(WindowSize, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> @@ -439,7 +439,7 @@ t_query_counter_async_inflight(_) -> emqx_resource:query(?ID, {inc_counter, 99}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), - #{?snk_kind := resource_worker_appended_to_queue}, + #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), @@ -490,7 +490,7 @@ t_query_counter_async_inflight(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(WindowSize, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> @@ -596,7 +596,7 @@ t_query_counter_async_inflight_batch(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(NumMsgs, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> @@ -623,7 +623,7 @@ t_query_counter_async_inflight_batch(_) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 2}), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), ?assertMatch(0, ets:info(Tab0, size)), @@ -646,7 +646,7 @@ t_query_counter_async_inflight_batch(_) -> emqx_resource:query(?ID, {inc_counter, 3}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), - #{?snk_kind := resource_worker_appended_to_queue}, + #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), @@ -706,7 +706,7 @@ t_query_counter_async_inflight_batch(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(NumMsgs, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> @@ -1055,7 +1055,7 @@ t_retry_batch(_Config) -> end, Payloads ), - #{?snk_kind := resource_worker_enter_blocked}, + #{?snk_kind := buffer_worker_enter_blocked}, 5_000 ), %% now the individual messages should have been counted @@ -1066,7 +1066,7 @@ t_retry_batch(_Config) -> %% batch shall remain enqueued. {ok, _} = snabbkaffe:block_until( - ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}), + ?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}), 5_000 ), %% should not have increased the matched count with the retries @@ -1078,7 +1078,7 @@ t_retry_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, 5_000 ), %% 1 more because of the `resume' call @@ -1140,7 +1140,7 @@ t_delete_and_re_create_with_same_name(_Config) -> ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), NumRequests = 10, {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_enter_blocked}), + ?match_event(#{?snk_kind := buffer_worker_enter_blocked}), NumBufferWorkers, _Timeout = 5_000 ), @@ -1189,7 +1189,7 @@ t_delete_and_re_create_with_same_name(_Config) -> resume_interval => 1_000 } ), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, 5_000 ), @@ -1271,13 +1271,13 @@ t_retry_sync_inflight(_Config) -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive @@ -1322,13 +1322,13 @@ t_retry_sync_inflight_batch(_Config) -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive @@ -1368,7 +1368,7 @@ t_retry_async_inflight(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1376,7 +1376,7 @@ t_retry_async_inflight(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok @@ -1411,7 +1411,7 @@ t_retry_async_inflight_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1419,7 +1419,7 @@ t_retry_async_inflight_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok @@ -1459,7 +1459,7 @@ t_async_pool_worker_death(_Config) -> NumReqs = 10, {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_appended_to_inflight}), + ?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}), NumReqs, 1_000 ), @@ -1472,7 +1472,7 @@ t_async_pool_worker_death(_Config) -> %% grab one of the worker pids and kill it {ok, SRef1} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_worker_down_update}), + ?match_event(#{?snk_kind := buffer_worker_worker_down_update}), NumBufferWorkers, 10_000 ), @@ -1568,8 +1568,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := resource_worker_flush_nack, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), @@ -1577,8 +1577,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> %% before restoring the resource health. ?assert( ?causality( - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), @@ -1588,8 +1588,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), @@ -1597,8 +1597,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> %% before restoring the resource health. ?assert( ?causality( - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 0372c21ea..fc7dce418 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -914,7 +914,7 @@ t_write_failure(Config) -> fun(Trace0) -> case QueryMode of sync -> - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([_ | _], Trace), [#{result := Result} | _] = Trace, ?assert( diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index ce38c357d..3bac01c66 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -413,7 +413,7 @@ t_write_failure(Config) -> end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, case Error of diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index f39ecc1dc..bdbbed8cf 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -431,7 +431,7 @@ t_write_failure(Config) -> end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, case Error of From a5424959c6fec3fa1ddb0bf4efd1bf71d6637206 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 18 Jan 2023 15:32:44 -0300 Subject: [PATCH 2/4] refactor: avoid operator precedence bugs --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl | 3 ++- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 71f8a8399..fadf05848 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -95,7 +95,8 @@ fields("config") -> } )} ] ++ - emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); + (emqx_connector_mysql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 7e21d4dd7..8bf7b1969 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -97,7 +97,8 @@ fields("config") -> } )} ] ++ - emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); + (emqx_connector_pgsql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; From 6fa6c679bb7f5c3dbe74a3450984d9875d63ca2a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 19 Jan 2023 18:07:08 -0300 Subject: [PATCH 3/4] feat(buffer_worker): add expiration time to requests With this, we avoid performing work or replying to callers that are no longer waiting on a result. Also introduces two new counters: - `dropped.expired` :: happens when a request expires before being sent downstream - `late_reply` :: when a response is receive from downstream, but the caller is no longer for a reply because the request has expired, and the caller might even have retried it. --- apps/emqx_bridge/include/emqx_bridge.hrl | 10 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 12 +- .../test/emqx_bridge_mqtt_SUITE.erl | 15 +- apps/emqx_resource/include/emqx_resource.hrl | 2 + .../src/emqx_resource_buffer_worker.erl | 319 ++++++-- .../src/emqx_resource_manager.erl | 2 + .../src/emqx_resource_metrics.erl | 37 + .../test/emqx_connector_demo.erl | 2 +- .../test/emqx_resource_SUITE.erl | 770 +++++++++++++++++- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 8 +- .../src/emqx_ee_connector_gcp_pubsub.erl | 1 - 11 files changed, 1090 insertions(+), 88 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index d8229cc77..d062b4b7f 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -16,19 +16,21 @@ -define(EMPTY_METRICS, ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ) ). -define(METRICS( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, @@ -40,12 +42,14 @@ #{ 'dropped' => Dropped, 'dropped.other' => DroppedOther, + 'dropped.expired' => DroppedExpired, 'dropped.queue_full' => DroppedQueueFull, 'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, 'queuing' => Queued, 'retried' => Retried, + 'late_reply' => LateReply, 'failed' => SentFailed, 'inflight' => SentInflight, 'success' => SentSucc, @@ -59,12 +63,14 @@ -define(metrics( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, @@ -76,12 +82,14 @@ #{ 'dropped' := Dropped, 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, 'dropped.queue_full' := DroppedQueueFull, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queuing' := Queued, 'retried' := Retried, + 'late_reply' := LateReply, 'failed' := SentFailed, 'inflight' := SentInflight, 'success' := SentSucc, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 50505effc..2c43ce5d7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -751,11 +751,11 @@ aggregate_metrics(AllMetrics) -> fun( #{ metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15 + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 ) }, ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15 + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 ) ) -> ?METRICS( @@ -773,7 +773,9 @@ aggregate_metrics(AllMetrics) -> M12 + N12, M13 + N13, M14 + N14, - M15 + N15 + M15 + N15, + M16 + N16, + M17 + N17 ) end, InitMetrics, @@ -805,11 +807,13 @@ format_metrics(#{ counters := #{ 'dropped' := Dropped, 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, 'dropped.queue_full' := DroppedQueueFull, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'retried' := Retried, + 'late_reply' := LateReply, 'failed' := SentFailed, 'success' := SentSucc, 'received' := Rcvd @@ -824,12 +828,14 @@ format_metrics(#{ ?METRICS( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1b4eac73e..1f5b06fab 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -830,7 +830,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"resource_opts">> => #{ <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, - <<"request_timeout">> => <<"500ms">>, + %% using a long time so we can test recovery + <<"request_timeout">> => <<"15s">>, %% to make it check the healthy quickly <<"health_check_interval">> => <<"0.5s">> } @@ -898,8 +899,10 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ), Payload1 = <<"hello2">>, Payload2 = <<"hello3">>, - emqx:publish(emqx_message:make(LocalTopic, Payload1)), - emqx:publish(emqx_message:make(LocalTopic, Payload2)), + %% we need to to it in other processes because it'll block due to + %% the long timeout + spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end), + spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end), {ok, _} = snabbkaffe:receive_events(SRef), %% verify the metrics of the bridge, the message should be queued @@ -917,9 +920,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, - <<"queuing">> := 1, - <<"inflight">> := 1 - } when Matched >= 3, + <<"queuing">> := Queuing, + <<"inflight">> := Inflight + } when Matched >= 3 andalso Inflight + Queuing == 2, maps:get(<<"metrics">>, DecodedMetrics1) ), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d7b080ae8..4f2a4883b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -29,6 +29,8 @@ -type query_opts() :: #{ %% The key used for picking a resource worker pick_key => term(), + timeout => timeout(), + expire_at => infinity | integer(), async_reply_fun => reply_fun() }. -type resource_data() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 4b16ed3d5..8b79ce5a8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -60,21 +60,23 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). --define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). -define(EXPAND(RESULT, BATCH), [ ?REPLY(FROM, REQUEST, SENT, RESULT) - || ?QUERY(FROM, REQUEST, SENT) <- BATCH + || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH ]). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), {Ref, BatchOrQuery, IsRetriable, WorkerMRef} ). +-define(ITEM_IDX, 2). -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). -type id() :: binary(). -type index() :: pos_integer(). --type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()). +-type expire_at() :: infinity | integer(). +-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type from() :: pid() | reply_fun() | request_from(). -type request_from() :: undefined | gen_statem:from(). @@ -98,14 +100,18 @@ start_link(Id, Index, Opts) -> gen_statem:start_link(?MODULE, {Id, Index, Opts}, []). -spec sync_query(id(), request(), query_opts()) -> Result :: term(). -sync_query(Id, Request, Opts) -> +sync_query(Id, Request, Opts0) -> + Opts1 = ensure_timeout_query_opts(Opts0, sync), + Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), - Timeout = maps:get(timeout, Opts, timer:seconds(15)), + Timeout = maps:get(timeout, Opts), emqx_resource_metrics:matched_inc(Id), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). -async_query(Id, Request, Opts) -> +async_query(Id, Request, Opts0) -> + Opts1 = ensure_timeout_query_opts(Opts0, async), + Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), emqx_resource_metrics:matched_inc(Id), pick_cast(Id, PickKey, {query, Request, Opts}). @@ -120,11 +126,15 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts = #{simple_query => true}, + QueryOpts0 = #{simple_query => true, timeout => infinity}, + QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_message_ref(), - Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), HasBeenSent = false, + From = self(), + Result = call_query( + sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts + ), _ = handle_query_result(Id, Result, HasBeenSent), Result. @@ -179,9 +189,14 @@ init({Id, Index, Opts}) -> ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. -running(enter, _, St) -> +running(enter, _, Data) -> ?tp(buffer_worker_enter_running, #{}), - maybe_flush(St); + %% According to `gen_statem' laws, we mustn't call `maybe_flush' + %% directly because it may decide to return `{next_state, blocked, _}', + %% and that's an invalid response for a state enter call. + %% Returning a next event from a state enter call is also + %% prohibited. + {keep_state, ensure_flush_timer(Data, 0)}; running(cast, resume, _St) -> keep_state_and_data; running(cast, flush, Data) -> @@ -243,9 +258,9 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== --define(PICK(ID, KEY, EXPR), +-define(PICK(ID, KEY, PID, EXPR), try gproc_pool:pick_worker(ID, KEY) of - Pid when is_pid(Pid) -> + PID when is_pid(PID) -> EXPR; _ -> ?RESOURCE_ERROR(worker_not_created, "resource not created") @@ -258,7 +273,7 @@ code_change(_OldVsn, State, _Extra) -> ). pick_call(Id, Key, Query, Timeout) -> - ?PICK(Id, Key, begin + ?PICK(Id, Key, Pid, begin Caller = self(), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), From = {Caller, MRef}, @@ -281,15 +296,21 @@ pick_call(Id, Key, Query, Timeout) -> end). pick_cast(Id, Key, Query) -> - ?PICK(Id, Key, begin + ?PICK(Id, Key, Pid, begin From = undefined, erlang:send(Pid, ?SEND_REQ(From, Query)), ok end). resume_from_blocked(Data) -> - #{inflight_tid := InflightTID} = Data, - case inflight_get_first_retriable(InflightTID) of + ?tp(buffer_worker_resume_from_blocked_enter, #{}), + #{ + id := Id, + index := Index, + inflight_tid := InflightTID + } = Data, + Now = now_(), + case inflight_get_first_retriable(InflightTID, Now) of none -> case is_inflight_full(InflightTID) of true -> @@ -297,14 +318,32 @@ resume_from_blocked(Data) -> false -> {next_state, running, Data} end; - {Ref, FirstQuery} -> + {expired, Ref, Batch} -> + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), + ?tp(buffer_worker_retry_expired, #{expired => Batch}), + resume_from_blocked(Data); + {single, Ref, Query} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. case is_inflight_full(InflightTID) of true -> {keep_state, Data}; false -> - retry_inflight_sync(Ref, FirstQuery, Data) + retry_inflight_sync(Ref, Query, Data) + end; + {batch, Ref, NotExpired, Expired} -> + update_inflight_item(InflightTID, Ref, NotExpired), + NumExpired = length(Expired), + emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), + %% We retry msgs in inflight window sync, as if we send them + %% async, they will be appended to the end of inflight window again. + case is_inflight_full(InflightTID) of + true -> + {keep_state, Data}; + false -> + retry_inflight_sync(Ref, NotExpired, Data) end end. @@ -320,10 +359,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of - ?QUERY(From, CoreReq, HasBeenSent) -> + ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) -> Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); - [?QUERY(_, _, _) | _] = Batch -> + [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, case ReplyResult of @@ -378,10 +417,12 @@ collect_and_enqueue_query_requests(Request0, Data0) -> (?SEND_REQ(undefined = _From, {query, Req, Opts})) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, - ?QUERY(ReplyFun, Req, HasBeenSent); - (?SEND_REQ(From, {query, Req, _Opts})) -> + ExpireAt = maps:get(expire_at, Opts), + ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); + (?SEND_REQ(From, {query, Req, Opts})) -> HasBeenSent = false, - ?QUERY(From, Req, HasBeenSent) + ExpireAt = maps:get(expire_at, Opts), + ?QUERY(From, Req, HasBeenSent, ExpireAt) end, Requests ), @@ -406,6 +447,8 @@ maybe_flush(Data0) -> -spec flush(data()) -> gen_statem:event_handler_result(state(), data()). flush(Data0) -> #{ + id := Id, + index := Index, batch_size := BatchSize, inflight_tid := InflightTID, queue := Q0 @@ -419,25 +462,45 @@ flush(Data0) -> Data2 = ensure_flush_timer(Data1), {keep_state, Data2}; {_, false} -> + ?tp(buffer_worker_flush_before_pop, #{}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), - IsBatch = BatchSize =/= 1, - %% We *must* use the new queue, because we currently can't - %% `nack' a `pop'. - %% Maybe we could re-open the queue? Data2 = Data1#{queue := Q1}, - Ref = make_message_ref(), - do_flush(Data2, #{ - new_queue => Q1, - is_batch => IsBatch, - batch => Batch, - ref => Ref, - ack_ref => QAckRef - }) + ?tp(buffer_worker_flush_before_sieve_expired, #{}), + Now = now_(), + %% if the request has expired, the caller is no longer + %% waiting for a response. + case sieve_expired_requests(Batch, Now) of + all_expired -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(buffer_worker_flush_all_expired, #{batch => Batch}), + flush(Data2); + {NotExpired, Expired} -> + NumExpired = length(Expired), + emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + IsBatch = BatchSize =/= 1, + %% We *must* use the new queue, because we currently can't + %% `nack' a `pop'. + %% Maybe we could re-open the queue? + ?tp( + buffer_worker_flush_potentially_partial, + #{expired => Expired, not_expired => NotExpired} + ), + Ref = make_message_ref(), + do_flush(Data2, #{ + new_queue => Q1, + is_batch => IsBatch, + batch => NotExpired, + ref => Ref, + ack_ref => QAckRef + }) + end end. -spec do_flush(data(), #{ is_batch := boolean(), - batch := [?QUERY(from(), request(), boolean())], + batch := [queue_query()], ack_ref := replayq:ack_ref(), ref := inflight_key(), new_queue := replayq:q() @@ -459,7 +522,7 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, + [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), @@ -812,10 +875,10 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async }), @@ -834,13 +897,13 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt end, Request ); -apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = [Request || ?QUERY(_From, Request, _) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async }), @@ -850,7 +913,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt begin ReplyFun = fun ?MODULE:batch_reply_after_query/8, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, - Requests = [Request || ?QUERY(_From, Request, _) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), @@ -862,7 +925,41 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ). reply_after_query( - Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result + Pid, + Id, + Index, + InflightTID, + Ref, + ?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query, + QueryOpts, + Result +) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => [Query], ref => Ref} + ), + Now = now_(), + case is_expired(ExpireAt, Now) of + true -> + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsFullBefore andalso ?MODULE:flush_worker(Pid), + ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), + ok; + false -> + do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result) + end. + +do_reply_after_query( + Pid, + Id, + Index, + InflightTID, + Ref, + ?QUERY(From, Request, HasBeenSent, _ExpireAt), + QueryOpts, + Result ) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the @@ -875,7 +972,7 @@ reply_after_query( %% Keep retrying. ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent), + batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -884,7 +981,7 @@ reply_after_query( ack -> ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent), + batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -896,6 +993,34 @@ reply_after_query( end. batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => Batch, ref => Ref} + ), + Now = now_(), + case sieve_expired_requests(Batch, Now) of + all_expired -> + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsFullBefore andalso ?MODULE:flush_worker(Pid), + ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}), + ok; + {NotExpired, Expired} -> + NumExpired = length(Expired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + NumExpired > 0 andalso + ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), + do_batch_reply_after_query( + Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result + ) + end. + +do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => Batch, ref => Ref} + ), %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. @@ -986,9 +1111,12 @@ inflight_new(InfltWinSZ, Id, Index) -> ), TableId. --spec inflight_get_first_retriable(ets:tid()) -> - none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. -inflight_get_first_retriable(InflightTID) -> +-spec inflight_get_first_retriable(ets:tid(), integer()) -> + none + | {expired, inflight_key(), [queue_query()]} + | {single, inflight_key(), queue_query()} + | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}. +inflight_get_first_retriable(InflightTID, Now) -> MatchSpec = ets:fun2ms( fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when @@ -1000,8 +1128,22 @@ inflight_get_first_retriable(InflightTID) -> case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; - {[{Ref, BatchOrQuery}], _Continuation} -> - {Ref, BatchOrQuery} + {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> + case is_expired(ExpireAt, Now) of + true -> + {expired, Ref, [Query]}; + false -> + {single, Ref, Query} + end; + {[{Ref, Batch = [_ | _]}], _Continuation} -> + %% batch is non-empty because we check that in + %% `sieve_expired_requests'. + case sieve_expired_requests(Batch, Now) of + all_expired -> + {expired, Ref, Batch}; + {NotExpired, Expired} -> + {batch, Ref, NotExpired, Expired} + end end. is_inflight_full(undefined) -> @@ -1030,7 +1172,7 @@ inflight_append(undefined, _InflightItem, _Id, _Index) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef), + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef), Id, Index ) -> @@ -1044,7 +1186,9 @@ inflight_append( ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef), + ?INFLIGHT_ITEM( + Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef + ), Id, Index ) -> @@ -1106,9 +1250,9 @@ ack_inflight(undefined, _Ref, _Id, _Index) -> ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> 1; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); _ -> 0 @@ -1133,6 +1277,12 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}), ok. +%% used to update a batch after dropping expired individual queries. +update_inflight_item(InflightTID, Ref, NewBatch) -> + _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), + ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), + ok. + %%============================================================================== inc_sent_failed(Id, _HasBeenSent = true) -> @@ -1180,11 +1330,14 @@ clear_disk_queue_dir(Id, Index) -> Res end. -ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> +ensure_flush_timer(Data = #{batch_time := T}) -> + ensure_flush_timer(Data, T). + +ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), Data#{tref => {TRef, Ref}}; -ensure_flush_timer(Data) -> +ensure_flush_timer(Data, _T) -> Data. cancel_flush_timer(St = #{tref := undefined}) -> @@ -1195,7 +1348,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> -spec make_message_ref() -> inflight_key(). make_message_ref() -> - erlang:monotonic_time(nanosecond). + now_(). collect_requests(Acc, Limit) -> Count = length(Acc), @@ -1213,9 +1366,9 @@ do_collect_requests(Acc, Count, Limit) -> mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); -mark_as_sent(?QUERY(From, Req, _)) -> +mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) -> HasBeenSent = true, - ?QUERY(From, Req, HasBeenSent). + ?QUERY(From, Req, HasBeenSent, ExpireAt). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; @@ -1235,3 +1388,49 @@ is_async_return({async_return, _}) -> true; is_async_return(_) -> false. + +sieve_expired_requests(Batch, Now) -> + {Expired, NotExpired} = + lists:partition( + fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) -> + is_expired(ExpireAt, Now) + end, + Batch + ), + case {NotExpired, Expired} of + {[], []} -> + %% Should be impossible for batch_size >= 1. + all_expired; + {[], [_ | _]} -> + all_expired; + {[_ | _], _} -> + {NotExpired, Expired} + end. + +-spec is_expired(infinity | integer(), integer()) -> boolean(). +is_expired(infinity = _ExpireAt, _Now) -> + false; +is_expired(ExpireAt, Now) -> + Now > ExpireAt. + +now_() -> + erlang:monotonic_time(nanosecond). + +-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts(). +ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> + Opts; +ensure_timeout_query_opts(#{} = Opts0, sync) -> + TimeoutMS = timer:seconds(15), + Opts0#{timeout => TimeoutMS}; +ensure_timeout_query_opts(#{} = Opts0, async) -> + Opts0#{timeout => infinity}. + +-spec ensure_expire_at(query_opts()) -> query_opts(). +ensure_expire_at(#{expire_at := _} = Opts) -> + Opts; +ensure_expire_at(#{timeout := infinity} = Opts) -> + Opts#{expire_at => infinity}; +ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> + TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond), + ExpireAt = now_() + TimeoutNS, + Opts#{expire_at => ExpireAt}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 95d1ed1d2..c01158b0a 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -134,8 +134,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'retried.success', 'retried.failed', 'success', + 'late_reply', 'failed', 'dropped', + 'dropped.expired', 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 455be9c22..28507e291 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -34,6 +34,9 @@ dropped_other_inc/1, dropped_other_inc/2, dropped_other_get/1, + dropped_expired_inc/1, + dropped_expired_inc/2, + dropped_expired_get/1, dropped_queue_full_inc/1, dropped_queue_full_inc/2, dropped_queue_full_get/1, @@ -46,6 +49,9 @@ failed_inc/1, failed_inc/2, failed_get/1, + late_reply_inc/1, + late_reply_inc/2, + late_reply_get/1, matched_inc/1, matched_inc/2, matched_get/1, @@ -75,9 +81,11 @@ events() -> [?TELEMETRY_PREFIX, Event] || Event <- [ dropped_other, + dropped_expired, dropped_queue_full, dropped_resource_not_found, dropped_resource_stopped, + late_reply, failed, inflight, matched, @@ -114,6 +122,9 @@ handle_telemetry_event( dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); + dropped_expired -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.expired', Val); dropped_queue_full -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); @@ -123,6 +134,8 @@ handle_telemetry_event( dropped_resource_stopped -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); + late_reply -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'late_reply', Val); failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); matched -> @@ -211,6 +224,30 @@ dropped_other_inc(ID, Val) -> dropped_other_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). +%% @doc Count of messages dropped due to being expired before being sent. +dropped_expired_inc(ID) -> + dropped_expired_inc(ID, 1). + +dropped_expired_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_expired], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_expired_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.expired'). + +%% @doc Count of messages that were sent but received a late reply. +late_reply_inc(ID) -> + late_reply_inc(ID, 1). + +late_reply_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, late_reply], #{counter_inc => Val}, #{ + resource_id => ID + }). + +late_reply_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'late_reply'). + %% @doc Count of messages dropped because the queue was full dropped_queue_full_inc(ID) -> dropped_queue_full_inc(ID, 1). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index c2b0c5733..4e3423808 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -260,7 +260,7 @@ counter_loop( ?tp(connector_demo_inc_counter_async, #{n => N}), State#{counter => Num + N}; {big_payload, _Payload, ReplyFun} when Status == blocked -> - apply_reply(ReplyFun, {error, blocked}), + apply_reply(ReplyFun, {error, {recoverable_error, blocked}}), State; {{FromPid, ReqRef}, {inc, N}} when Status == running -> %ct:pal("sync counter recv: ~p", [{inc, N}]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index c9325af2b..9b2af74f6 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -232,7 +232,7 @@ t_batch_query_counter(_) -> fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), QueryTrace = ?of_kind(call_batch_query, Trace), - ?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace) + ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace) end ), @@ -284,7 +284,7 @@ t_query_counter_async_query(_) -> fun(Trace) -> %% the callback_mode of 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), %% simple query ignores the query_mode and batching settings in the resource_worker @@ -295,7 +295,7 @@ t_query_counter_async_query(_) -> ?assertMatch({ok, 1000}, Result), %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -337,7 +337,7 @@ t_query_counter_async_callback(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), @@ -348,7 +348,7 @@ t_query_counter_async_callback(_) -> fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), tap_metrics(?LINE), @@ -476,7 +476,7 @@ t_query_counter_async_inflight(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace), + ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok @@ -495,7 +495,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), @@ -605,8 +605,8 @@ t_query_counter_async_inflight_batch(_) -> [ #{ batch := [ - {query, _, {inc_counter, 1}, _}, - {query, _, {inc_counter, 1}, _} + {query, _, {inc_counter, 1}, _, _}, + {query, _, {inc_counter, 1}, _, _} ] } | _ @@ -687,7 +687,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end @@ -712,7 +712,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end @@ -1499,9 +1499,680 @@ t_async_pool_worker_death(_Config) -> ), ok. +t_expiration_sync_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(sync). + +t_expiration_sync_batch_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(sync). + +t_expiration_async_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(async). + +t_expiration_async_batch_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(async). + +do_t_expiration_before_sending(QueryMode) -> + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + ?force_ordering( + #{?snk_kind := buffer_worker_flush_before_pop}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{?snk_kind := buffer_worker_flush_before_sieve_expired} + ), + + TimeoutMS = 100, + spawn_link(fun() -> + case QueryMode of + sync -> + ?assertError( + timeout, + emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) + ); + async -> + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) + ) + end + end), + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_flush_all_expired}, 4 * TimeoutMS), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{batch := [{query, _, {inc_counter, 99}, _, _}]}], + ?of_kind(buffer_worker_flush_all_expired, Trace) + ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 2, + %% the block call + success := 1, + dropped := 1, + 'dropped.expired' := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_sync_before_sending_partial_batch(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_before_sending_partial_batch(sync). + +t_expiration_async_before_sending_partial_batch(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_before_sending_partial_batch(async). + +do_t_expiration_before_sending_partial_batch(QueryMode) -> + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + ?force_ordering( + #{?snk_kind := buffer_worker_flush_before_pop}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{?snk_kind := buffer_worker_flush_before_sieve_expired} + ), + + Pid0 = + spawn_link(fun() -> + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) + ), + ?tp(infinity_query_returned, #{}) + end), + TimeoutMS = 100, + Pid1 = + spawn_link(fun() -> + case QueryMode of + sync -> + ?assertError( + timeout, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ); + async -> + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ) + end + end), + Pid2 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS + ), + ok = emqx_resource:simple_sync_query(?ID, resume), + case QueryMode of + async -> + {ok, _} = ?block_until( + #{ + ?snk_kind := buffer_worker_reply_after_query, + action := ack, + batch_or_query := [{query, _, {inc_counter, 99}, _, _}] + }, + 10 * TimeoutMS + ); + sync -> + %% more time because it needs to retry if sync + {ok, _} = ?block_until(#{?snk_kind := infinity_query_returned}, 20 * TimeoutMS) + end, + + lists:foreach( + fun(Pid) -> + unlink(Pid), + exit(Pid, kill) + end, + [Pid0, Pid1, Pid2] + ), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}], + not_expired := [{query, _, {inc_counter, 99}, _, _}] + } + ], + ?of_kind(buffer_worker_flush_potentially_partial, Trace) + ), + wait_until_gauge_is(inflight, 0, 500), + Metrics = tap_metrics(?LINE), + case QueryMode of + async -> + ?assertMatch( + #{ + counters := #{ + matched := 4, + %% the block call, the request with + %% infinity timeout, and the resume + %% call. + success := 3, + dropped := 1, + 'dropped.expired' := 1, + %% was sent successfully and held by + %% the test connector. + retried := 0, + failed := 0 + } + }, + Metrics + ); + sync -> + ?assertMatch( + #{ + counters := #{ + matched := 4, + %% the block call, the request with + %% infinity timeout, and the resume + %% call. + success := 3, + dropped := 1, + 'dropped.expired' := 1, + %% currently, the test connector + %% replies with an error that may make + %% the buffer worker retry. + retried := Retried, + failed := 0 + } + } when Retried =< 1, + Metrics + ) + end, + ok + end + ), + ok. + +t_expiration_async_after_reply(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_async_after_reply(single). + +t_expiration_async_batch_after_reply(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 2_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_async_after_reply(batch). + +do_t_expiration_async_after_reply(IsBatch) -> + ?check_trace( + begin + NAcks = + case IsBatch of + batch -> 1; + single -> 2 + end, + ?force_ordering( + #{?snk_kind := buffer_worker_flush_ack}, + NAcks, + #{?snk_kind := delay_enter}, + _Guard = true + ), + ?force_ordering( + #{?snk_kind := delay}, + #{ + ?snk_kind := buffer_worker_reply_after_query_enter, + batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + } + ), + + TimeoutMS = 100, + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ), + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) + ), + Pid0 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS + ), + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + ), + + unlink(Pid0), + exit(Pid0, kill), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}] + } + ], + ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ), + wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 2, + %% the request with infinity timeout. + success := 1, + dropped := 0, + late_reply := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_batch_all_expired_after_reply(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := buffer_worker_flush_ack}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{ + ?snk_kind := buffer_worker_reply_after_query_enter, + batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + } + ), + + TimeoutMS = 200, + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ), + Pid0 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + ), + + unlink(Pid0), + exit(Pid0, kill), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}] + } + ], + ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 1, + success := 0, + dropped := 0, + late_reply := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_retry(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 300 + } + ), + do_t_expiration_retry(single). + +t_expiration_retry_batch(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 300 + } + ), + do_t_expiration_retry(batch). + +do_t_expiration_retry(IsBatch) -> + ResumeInterval = 300, + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 1, + 200 + ), + TimeoutMS = 100, + %% the request that expires must be first, so it's the + %% head of the inflight table (and retriable). + {ok, SRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_appended_to_queue}), + 1, + ResumeInterval * 2 + ), + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 1}, + #{timeout => TimeoutMS} + ) + ) + end), + Pid1 = + spawn_link(fun() -> + receive + go -> ok + end, + ?assertEqual( + ok, + emqx_resource:query( + ?ID, + {inc_counter, 2}, + #{timeout => infinity} + ) + ) + end), + {ok, _} = snabbkaffe:receive_events(SRef1), + Pid1 ! go, + {ok, _} = snabbkaffe:receive_events(SRef0), + + {ok, _} = + ?block_until( + #{?snk_kind := buffer_worker_retry_expired}, + ResumeInterval * 10 + ), + + SuccessEventKind = + case IsBatch of + batch -> buffer_worker_retry_inflight_succeeded; + single -> buffer_worker_flush_ack + end, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := SuccessEventKind}, + ResumeInterval * 5 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], + ?of_kind(buffer_worker_retry_expired, Trace) + ), + ok + end + ), + ok. + +t_expiration_retry_batch_multiple_times(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 1, + 200 + ), + TimeoutMS = 100, + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 1}, + #{timeout => TimeoutMS} + ) + ) + end), + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 2}, + #{timeout => ResumeInterval + TimeoutMS} + ) + ) + end), + {ok, _} = snabbkaffe:receive_events(SRef), + + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := buffer_worker_retry_expired}), + ResumeInterval * 10 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{expired := [{query, _, {inc_counter, 1}, _, _}]}, + #{expired := [{query, _, {inc_counter, 2}, _, _}]} + ], + ?of_kind(buffer_worker_retry_expired, Trace) + ), + ok + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ + inc_counter_in_parallel(N) -> inc_counter_in_parallel(N, #{}). @@ -1564,6 +2235,81 @@ tap_metrics(Line) -> ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. +install_telemetry_handler(TestCase) -> + Tid = ets:new(TestCase, [ordered_set, public]), + HandlerId = TestCase, + TestPid = self(), + _ = telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + ets:insert(Tid, {erlang:monotonic_time(), Data}), + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> + telemetry:detach(HandlerId), + ets:delete(Tid) + end), + put({?MODULE, telemetry_table}, Tid), + Tid. + +wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> + Events = receive_all_events(GaugeName, Timeout), + case length(Events) > 0 andalso lists:last(Events) of + #{measurements := #{gauge_set := ExpectedValue}} -> + ok; + #{measurements := #{gauge_set := Value}} -> + ct:fail( + "gauge ~p didn't reach expected value ~p; last value: ~p", + [GaugeName, ExpectedValue, Value] + ); + false -> + ct:pal("no ~p gauge events received!", [GaugeName]) + end. + +receive_all_events(EventName, Timeout) -> + receive_all_events(EventName, Timeout, []). + +receive_all_events(EventName, Timeout, Acc) -> + receive + {telemetry, #{name := [_, _, EventName]} = Event} -> + receive_all_events(EventName, Timeout, [Event | Acc]) + after Timeout -> + lists:reverse(Acc) + end. + +wait_telemetry_event(EventName) -> + wait_telemetry_event(EventName, #{timeout => 5_000, n_events => 1}). + +wait_telemetry_event( + EventName, + Opts0 +) -> + DefaultOpts = #{timeout => 5_000, n_events => 1}, + #{timeout := Timeout, n_events := NEvents} = maps:merge(DefaultOpts, Opts0), + wait_n_events(NEvents, Timeout, EventName). + +wait_n_events(NEvents, _Timeout, _EventName) when NEvents =< 0 -> + ok; +wait_n_events(NEvents, Timeout, EventName) -> + TelemetryTable = get({?MODULE, telemetry_table}), + receive + {telemetry, #{name := [_, _, EventName]}} -> + wait_n_events(NEvents - 1, Timeout, EventName) + after Timeout -> + RecordedEvents = ets:tab2list(TelemetryTable), + ct:pal("recorded events: ~p", [RecordedEvents]), + error({timeout_waiting_for_telemetry, EventName}) + end. + assert_sync_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 47a4646de..c9560739f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -350,10 +350,12 @@ service_account_json(PrivateKeyPEM) -> metrics_mapping() -> #{ dropped => fun emqx_resource_metrics:dropped_get/1, + dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1, dropped_other => fun emqx_resource_metrics:dropped_other_get/1, dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, + late_reply => fun emqx_resource_metrics:late_reply_get/1, failed => fun emqx_resource_metrics:failed_get/1, inflight => fun emqx_resource_metrics:inflight_get/1, matched => fun emqx_resource_metrics:matched_get/1, @@ -1117,9 +1119,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> CurrentMetrics ); {timeout, async} -> - wait_telemetry_event(TelemetryTable, success, ResourceId, #{ - timeout => 10_000, n_events => 2 - }), wait_until_gauge_is(inflight, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400), assert_metrics( @@ -1130,7 +1129,8 @@ do_econnrefused_or_timeout_test(Config, Error) -> matched => 2, queuing => 0, retried => 0, - success => 2 + success => 0, + late_reply => 2 }, ResourceId ); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 041bdec08..898c36fe0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -58,7 +58,6 @@ %% emqx_resource API %%------------------------------------------------------------------------------------------------- -%% TODO: check is_buffer_supported() -> false. callback_mode() -> async_if_possible. From ca4a262b758ba7adc537813292f76242bf4f96c1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 20 Jan 2023 08:24:53 -0300 Subject: [PATCH 4/4] refactor: re-organize dealing with unrecoverable errors --- .../test/emqx_bridge_mqtt_SUITE.erl | 11 ++- .../src/emqx_connector_pgsql.erl | 6 +- .../src/emqx_resource_buffer_worker.erl | 81 ++++++++++--------- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 15 ++-- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 33 +++++++- .../test/emqx_ee_bridge_mysql_SUITE.erl | 2 +- .../test/emqx_ee_bridge_pgsql_SUITE.erl | 26 ++++-- .../src/emqx_ee_connector_influxdb.erl | 34 +++++++- 8 files changed, 141 insertions(+), 67 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1f5b06fab..a99f06f20 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -957,12 +957,11 @@ assert_mqtt_msg_received(Topic, Payload) -> receive {deliver, Topic, #message{payload = Payload}} -> ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), - ok; - Msg -> - ct:pal("Unexpected Msg: ~p", [Msg]), - assert_mqtt_msg_received(Topic, Payload) - after 100 -> - ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic]) + ok + after 300 -> + {messages, Messages} = process_info(self(), messages), + Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]), + error({Msg, #{messages => Messages}}) end. request(Method, Url, Body) -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 34defb5e5..890227b9d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -189,8 +189,8 @@ on_batch_query( Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of - {error, Error} -> - {error, Error}; + {error, _Error} = Result -> + handle_result(Result); {_Column, Results} -> handle_batch_result(Results, 0) end @@ -417,6 +417,8 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, disconnected}) -> + {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> {error, {unrecoverable_error, Error}}; handle_result(Res) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8b79ce5a8..669b8e474 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -783,45 +783,26 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> ok end, {nack, PostFn}; -handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), - ok - end, - {ack, PostFn}; -handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> - %% the message will be queued in replayq or inflight window, - %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not - %% sent this message. - PostFn = fun() -> - ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), - ok - end, - {ack, PostFn}; -handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> - {ack, fun() -> ok end}; -handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) -> - {ack, fun() -> ok end}; +handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> + case is_unrecoverable_error(Error) of + true -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; + false -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), + ok + end, + {nack, PostFn} + end; +handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> + handle_query_async_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), @@ -830,6 +811,28 @@ handle_query_result_pure(Id, Result, HasBeenSent) -> end, {ack, PostFn}. +handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> + case is_unrecoverable_error(Error) of + true -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; + false -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}), + ok + end, + {nack, PostFn} + end; +handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> + {ack, fun() -> ok end}; +handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> + {ack, fun() -> ok end}. + handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index c9560739f..247b7799b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -1135,8 +1135,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> ResourceId ); {_, sync} -> - %% even waiting, hard to avoid flakiness... simpler to just sleep - %% a bit until stabilization. wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 1, 500), assert_metrics( @@ -1336,12 +1334,19 @@ t_unrecoverable_error(Config) -> ), wait_until_gauge_is(queuing, 0, _Timeout = 400), - wait_until_gauge_is(inflight, 1, _Timeout = 400), + %% TODO: once temporary clause in + %% `emqx_resource_buffer_worker:is_unrecoverable_error' + %% that marks all unknown errors as unrecoverable is + %% removed, this inflight should be 1, because we retry if + %% the worker is killed. + wait_until_gauge_is(inflight, 0, _Timeout = 400), assert_metrics( #{ dropped => 0, - failed => 0, - inflight => 1, + %% FIXME: see comment above; failed should be 0 + %% and inflight should be 1. + failed => 1, + inflight => 0, matched => 1, queuing => 0, retried => 0, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index fc7dce418..e1899b1b2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -277,6 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" + " request_timeout = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -313,6 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" + " request_timeout = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -906,9 +908,23 @@ t_write_failure(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> case QueryMode of sync -> - ?assertError(timeout, send_message(Config, SentData)); + {_, {ok, _}} = + ?wait_async_action( + try + send_message(Config, SentData) + catch + error:timeout -> + {error, timeout} + end, + #{?snk_kind := buffer_worker_flush_nack}, + 1_000 + ); async -> - ?assertEqual(ok, send_message(Config, SentData)) + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := buffer_worker_reply_after_query}, + 1_000 + ) end end), fun(Trace0) -> @@ -920,11 +936,20 @@ t_write_failure(Config) -> ?assert( {error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, closed}} =:= Result orelse - {error, {error, econnrefused}} =:= Result, + {error, {recoverable_error, {error, econnrefused}}} =:= Result, #{got => Result} ); async -> - ok + Trace = ?of_kind(buffer_worker_reply_after_query, Trace0), + ?assertMatch([#{action := nack} | _], Trace), + [#{result := Result} | _] = Trace, + ?assert( + {error, {recoverable_error, {closed, "The connection was lost."}}} =:= + Result orelse + {error, {error, closed}} =:= Result orelse + {error, {recoverable_error, econnrefused}} =:= Result, + #{got => Result} + ) end, ok end diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 3bac01c66..57792b366 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -228,7 +228,7 @@ query_resource(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request). + emqx_resource:query(ResourceID, Request, #{timeout => 500}). unprepare(Config, Key) -> Name = ?config(mysql_name, Config), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index bdbbed8cf..25752f685 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -249,7 +249,7 @@ query_resource(Config, Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request). + emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). connect_direct_pgsql(Config) -> Opts = #{ @@ -422,12 +422,22 @@ t_write_failure(Config) -> SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - case QueryMode of - sync -> - ?assertError(timeout, send_message(Config, SentData)); - async -> - send_message(Config, SentData) - end + {_, {ok, _}} = + ?wait_async_action( + case QueryMode of + sync -> + try + send_message(Config, SentData) + catch + error:timeout -> + {error, timeout} + end; + async -> + send_message(Config, SentData) + end, + #{?snk_kind := buffer_worker_flush_nack}, + 1_000 + ) end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), @@ -437,7 +447,7 @@ t_write_failure(Config) -> case Error of {resource_error, _} -> ok; - disconnected -> + {recoverable_error, disconnected} -> ok; _ -> ct:fail("unexpected error: ~p", [Error]) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 0037242b3..1ae4d9874 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -26,6 +26,7 @@ on_batch_query_async/4, on_get_status/2 ]). +-export([reply_callback/2]). -export([ namespace/0, @@ -353,7 +354,12 @@ do_query(InstId, Client, Points) -> connector => InstId, reason => Reason }), - Err + case is_unrecoverable_error(Err) of + true -> + {error, {unrecoverable_error, Reason}}; + false -> + {error, {recoverable_error, Reason}} + end end. do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> @@ -362,7 +368,20 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> connector => InstId, points => Points }), - {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs). + WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, + {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs). + +reply_callback(ReplyFunAndArgs, {error, Reason} = Error) -> + case is_unrecoverable_error(Error) of + true -> + Result = {error, {unrecoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); + false -> + Result = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end; +reply_callback(ReplyFunAndArgs, Result) -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans @@ -583,6 +602,17 @@ str(B) when is_binary(B) -> str(S) when is_list(S) -> S. +is_unrecoverable_error({error, {unrecoverable_error, _}}) -> + true; +is_unrecoverable_error({error, {recoverable_error, _}}) -> + false; +is_unrecoverable_error({error, {error, econnrefused}}) -> + false; +is_unrecoverable_error({error, econnrefused}) -> + false; +is_unrecoverable_error(_) -> + false. + %%=================================================================== %% eunit tests %%===================================================================