diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 59b41c7fa..5b3fe796b 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -65,7 +65,7 @@ load() -> fun({Type, NamedConf}) -> lists:foreach( fun({Name, Conf}) -> - %% fetch opts for `emqx_resource_worker` + %% fetch opts for `emqx_resource_buffer_worker` ResOpts = emqx_resource:fetch_creation_opts(Conf), safe_load_bridge(Type, Name, Conf, ResOpts) end, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index d20d3bc10..1b4eac73e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -886,9 +886,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> {ok, SRef} = snabbkaffe:subscribe( fun - (#{?snk_kind := resource_worker_retry_inflight_failed}) -> + (#{?snk_kind := buffer_worker_retry_inflight_failed}) -> true; - (#{?snk_kind := resource_worker_flush_nack}) -> + (#{?snk_kind := buffer_worker_flush_nack}) -> true; (_) -> false diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0fd21bfcd..bb27b6acd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -255,7 +255,7 @@ reset_metrics(ResId) -> query(ResId, Request) -> query(ResId, Request, #{}). --spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> +-spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of @@ -263,11 +263,11 @@ query(ResId, Request, Opts) -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> - emqx_resource_worker:simple_sync_query(ResId, Request); + emqx_resource_buffer_worker:simple_sync_query(ResId, Request); {false, sync} -> - emqx_resource_worker:sync_query(ResId, Request, Opts); + emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> - emqx_resource_worker:async_query(ResId, Request, Opts) + emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -275,7 +275,7 @@ query(ResId, Request, Opts) -> -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> - emqx_resource_worker:simple_sync_query(ResId, Request). + emqx_resource_buffer_worker:simple_sync_query(ResId, Request). -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl similarity index 97% rename from apps/emqx_resource/src/emqx_resource_worker.erl rename to apps/emqx_resource/src/emqx_resource_buffer_worker.erl index d7010c3bd..4b16ed3d5 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -17,7 +17,7 @@ %% This module implements async message sending, disk message queuing, %% and message batching using ReplayQ. --module(emqx_resource_worker). +-module(emqx_resource_buffer_worker). -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). @@ -176,11 +176,11 @@ init({Id, Index, Opts}) -> resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined }, - ?tp(resource_worker_init, #{id => Id, index => Index}), + ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. running(enter, _, St) -> - ?tp(resource_worker_enter_running, #{}), + ?tp(buffer_worker_enter_running, #{}), maybe_flush(St); running(cast, resume, _St) -> keep_state_and_data; @@ -206,7 +206,7 @@ running(info, Info, _St) -> keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = _St) -> - ?tp(resource_worker_enter_blocked, #{}), + ?tp(buffer_worker_enter_blocked, #{}), {keep_state_and_data, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; @@ -315,7 +315,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> index := Index, resume_interval := ResumeT } = Data0, - ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), + ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = @@ -331,7 +331,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {nack, PostFn} -> PostFn(), ?tp( - resource_worker_retry_inflight_failed, + buffer_worker_retry_inflight_failed, #{ ref => Ref, query_or_batch => QueryOrBatch @@ -349,7 +349,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% we bump the counter when removing it from the table. IsAcked andalso PostFn(), ?tp( - resource_worker_retry_inflight_succeeded, + buffer_worker_retry_inflight_succeeded, #{ ref => Ref, query_or_batch => QueryOrBatch @@ -415,7 +415,7 @@ flush(Data0) -> {0, _} -> {keep_state, Data1}; {_, true} -> - ?tp(resource_worker_flush_but_inflight_full, #{}), + ?tp(buffer_worker_flush_but_inflight_full, #{}), Data2 = ensure_flush_timer(Data1), {keep_state, Data2}; {_, false} -> @@ -483,7 +483,7 @@ do_flush( store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_nack, + buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, @@ -512,7 +512,7 @@ do_flush( store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_ack, + buffer_worker_flush_ack, #{ batch_or_query => Request, result => Result @@ -560,7 +560,7 @@ do_flush(Data0, #{ store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_nack, + buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, @@ -589,7 +589,7 @@ do_flush(Data0, #{ store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( - resource_worker_flush_ack, + buffer_worker_flush_ack, #{ batch_or_query => Batch, result => Result @@ -873,7 +873,7 @@ reply_after_query( case Action of nack -> %% Keep retrying. - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), ref => Ref, @@ -882,7 +882,7 @@ reply_after_query( mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), ref => Ref, @@ -903,7 +903,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu case Action of nack -> %% Keep retrying. - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => nack, batch_or_query => Batch, ref => Ref, @@ -912,7 +912,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(resource_worker_reply_after_query, #{ + ?tp(buffer_worker_reply_after_query, #{ action => ack, batch_or_query => Batch, ref => Ref, @@ -955,7 +955,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> end, emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp( - resource_worker_appended_to_queue, + buffer_worker_appended_to_queue, #{ id => Id, items => Queries, @@ -973,7 +973,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( - emqx_resource_worker_inflight_tab, + emqx_resource_buffer_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), @@ -1040,7 +1040,7 @@ inflight_append( BatchSize = length(Batch), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), + ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append( InflightTID, @@ -1053,7 +1053,7 @@ inflight_append( IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), + ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> ets:insert(InflightTID, {Ref, Data}), @@ -1130,7 +1130,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), - ?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}), + ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}), ok. %%============================================================================== diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl similarity index 94% rename from apps/emqx_resource/src/emqx_resource_worker_sup.erl rename to apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 8b0ce2c65..4987946c9 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_resource_worker_sup). +-module(emqx_resource_buffer_worker_sup). -behaviour(supervisor). %%%============================================================================= @@ -99,7 +99,7 @@ ensure_worker_added(ResId, Idx) -> -define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). ensure_worker_started(ResId, Idx, Opts) -> - Mod = emqx_resource_worker, + Mod = emqx_resource_buffer_worker, Spec = #{ id => ?CHILD_ID(Mod, ResId, Idx), start => {Mod, start_link, [ResId, Idx, Opts]}, @@ -116,7 +116,7 @@ ensure_worker_started(ResId, Idx, Opts) -> end. ensure_worker_removed(ResId, Idx) -> - ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), + ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx), case supervisor:terminate_child(?SERVER, ChildId) of ok -> Res = supervisor:delete_child(?SERVER, ChildId), @@ -129,7 +129,7 @@ ensure_worker_removed(ResId, Idx) -> end. ensure_disk_queue_dir_absent(ResourceId, Index) -> - ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index), + ok = emqx_resource_buffer_worker:clear_disk_queue_dir(ResourceId, Index), ok. ensure_worker_pool_removed(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 1dd088fca..5a3704337 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -145,7 +145,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> %% buffer, so there is no need for resource workers ok; false -> - ok = emqx_resource_worker_sup:start_workers(ResId, Opts), + ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); @@ -468,7 +468,7 @@ retry_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> stop_resource(Data), - ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), + ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok @@ -582,9 +582,9 @@ maybe_alarm(_Status, ResId) -> maybe_resume_resource_workers(connected) -> lists:foreach( fun({_, Pid, _, _}) -> - emqx_resource_worker:resume(Pid) + emqx_resource_buffer_worker:resume(Pid) end, - supervisor:which_children(emqx_resource_worker_sup) + supervisor:which_children(emqx_resource_buffer_worker_sup) ); maybe_resume_resource_workers(_) -> ok. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index ea31b8b6b..4d9abb03d 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -39,8 +39,8 @@ init([]) -> modules => [emqx_resource_manager_sup] }, WorkerSup = #{ - id => emqx_resource_worker_sup, - start => {emqx_resource_worker_sup, start_link, []}, + id => emqx_resource_buffer_worker_sup, + start => {emqx_resource_buffer_worker_sup, start_link, []}, restart => permanent, shutdown => infinity, type => supervisor diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 97bc8da66..c9325af2b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -414,7 +414,7 @@ t_query_counter_async_inflight(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(WindowSize, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> @@ -439,7 +439,7 @@ t_query_counter_async_inflight(_) -> emqx_resource:query(?ID, {inc_counter, 99}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), - #{?snk_kind := resource_worker_appended_to_queue}, + #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), @@ -490,7 +490,7 @@ t_query_counter_async_inflight(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(WindowSize, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> @@ -596,7 +596,7 @@ t_query_counter_async_inflight_batch(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(NumMsgs, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> @@ -623,7 +623,7 @@ t_query_counter_async_inflight_batch(_) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 2}), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), ?assertMatch(0, ets:info(Tab0, size)), @@ -646,7 +646,7 @@ t_query_counter_async_inflight_batch(_) -> emqx_resource:query(?ID, {inc_counter, 3}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), - #{?snk_kind := resource_worker_appended_to_queue}, + #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), @@ -706,7 +706,7 @@ t_query_counter_async_inflight_batch(_) -> {_, {ok, _}} = ?wait_async_action( inc_counter_in_parallel(NumMsgs, ReqOpts), - #{?snk_kind := resource_worker_flush_but_inflight_full}, + #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> @@ -1055,7 +1055,7 @@ t_retry_batch(_Config) -> end, Payloads ), - #{?snk_kind := resource_worker_enter_blocked}, + #{?snk_kind := buffer_worker_enter_blocked}, 5_000 ), %% now the individual messages should have been counted @@ -1066,7 +1066,7 @@ t_retry_batch(_Config) -> %% batch shall remain enqueued. {ok, _} = snabbkaffe:block_until( - ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}), + ?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}), 5_000 ), %% should not have increased the matched count with the retries @@ -1078,7 +1078,7 @@ t_retry_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, 5_000 ), %% 1 more because of the `resume' call @@ -1140,7 +1140,7 @@ t_delete_and_re_create_with_same_name(_Config) -> ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), NumRequests = 10, {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_enter_blocked}), + ?match_event(#{?snk_kind := buffer_worker_enter_blocked}), NumBufferWorkers, _Timeout = 5_000 ), @@ -1189,7 +1189,7 @@ t_delete_and_re_create_with_same_name(_Config) -> resume_interval => 1_000 } ), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, 5_000 ), @@ -1271,13 +1271,13 @@ t_retry_sync_inflight(_Config) -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive @@ -1322,13 +1322,13 @@ t_retry_sync_inflight_batch(_Config) -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_inflight_succeeded}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive @@ -1368,7 +1368,7 @@ t_retry_async_inflight(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1376,7 +1376,7 @@ t_retry_async_inflight(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok @@ -1411,7 +1411,7 @@ t_retry_async_inflight_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_retry_inflight_failed}, + #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1419,7 +1419,7 @@ t_retry_async_inflight_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_enter_running}, + #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok @@ -1459,7 +1459,7 @@ t_async_pool_worker_death(_Config) -> NumReqs = 10, {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_appended_to_inflight}), + ?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}), NumReqs, 1_000 ), @@ -1472,7 +1472,7 @@ t_async_pool_worker_death(_Config) -> %% grab one of the worker pids and kill it {ok, SRef1} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_worker_down_update}), + ?match_event(#{?snk_kind := buffer_worker_worker_down_update}), NumBufferWorkers, 10_000 ), @@ -1568,8 +1568,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := resource_worker_flush_nack, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), @@ -1577,8 +1577,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> %% before restoring the resource health. ?assert( ?causality( - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), @@ -1588,8 +1588,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), @@ -1597,8 +1597,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> %% before restoring the resource health. ?assert( ?causality( - #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, - #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 71f8a8399..fadf05848 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -95,7 +95,8 @@ fields("config") -> } )} ] ++ - emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); + (emqx_connector_mysql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 7e21d4dd7..8bf7b1969 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -97,7 +97,8 @@ fields("config") -> } )} ] ++ - emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); + (emqx_connector_pgsql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 0372c21ea..fc7dce418 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -914,7 +914,7 @@ t_write_failure(Config) -> fun(Trace0) -> case QueryMode of sync -> - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([_ | _], Trace), [#{result := Result} | _] = Trace, ?assert( diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index ce38c357d..3bac01c66 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -413,7 +413,7 @@ t_write_failure(Config) -> end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, case Error of diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index f39ecc1dc..bdbbed8cf 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -431,7 +431,7 @@ t_write_failure(Config) -> end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), - Trace = ?of_kind(resource_worker_flush_nack, Trace0), + Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, case Error of