From 29ae45c39d57484efaae851597ce33b32b531d88 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 1 Dec 2023 15:10:00 -0300 Subject: [PATCH] fix(connector): don't start buffer workers for the connector itself Fixes https://emqx.atlassian.net/browse/EMQX-11448 --- .../src/emqx_connector_resource.erl | 6 ++- .../test/emqx_connector_SUITE.erl | 50 +++++++++++++++++-- .../test/emqx_connector_dummy_impl.erl | 2 + apps/emqx_resource/include/emqx_resource.hrl | 5 +- .../src/emqx_resource_manager.erl | 17 +++---- 5 files changed, 62 insertions(+), 18 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index ff2790481..72576b22e 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -382,9 +382,13 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8); safe_atom(Atom) when is_atom(Atom) -> Atom. parse_opts(Conf, Opts0) -> - override_start_after_created(Conf, Opts0). + Opts1 = override_start_after_created(Conf, Opts0), + set_no_buffer_workers(Opts1). override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled), Opts#{start_after_created => StartAfterCreated}. + +set_no_buffer_workers(Opts) -> + Opts#{spawn_buffer_workers => false}. diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 669d05442..f0e021490 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -163,11 +163,11 @@ t_remove_fail({'init', Config}) -> meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_get_status, 2, connected), - [{mocked_mods, [?CONNECTOR, emqx_connector_ee_schema]} | Config]; -t_remove_fail({'end', Config}) -> - MockedMods = ?config(mocked_mods, Config), - meck:unload(MockedMods), + meck:expect(?CONNECTOR, query_mode, 1, simple_async_internal_buffer), Config; +t_remove_fail({'end', _Config}) -> + meck:unload(), + ok; t_remove_fail(_Config) -> ?assertEqual( [], @@ -200,7 +200,20 @@ t_remove_fail(_Config) -> {_, {?CONNECTOR, on_add_channel, _}, {ok, connector_state}}, {_, {?CONNECTOR, on_get_channels, [_]}, _} ], - meck:history(?CONNECTOR) + lists:filter( + fun({_, {?CONNECTOR, Fun, _Args}, _}) -> + lists:member( + Fun, [ + callback_mode, + on_start, + on_get_channels, + on_get_status, + on_add_channel + ] + ) + end, + meck:history(?CONNECTOR) + ) ), ok. @@ -269,6 +282,33 @@ t_create_with_bad_name_root_path(_Config) -> ), ok. +t_no_buffer_workers({'init', Config}) -> + meck:new(emqx_connector_ee_schema, [passthrough]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR), + meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), + meck:expect(?CONNECTOR, on_get_channels, 1, []), + meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR, on_stop, 2, ok), + meck:expect(?CONNECTOR, on_get_status, 2, connected), + meck:expect(?CONNECTOR, query_mode, 1, sync), + [ + {path, [connectors, kafka_producer, no_bws]} + | Config + ]; +t_no_buffer_workers({'end', Config}) -> + Path = ?config(path, Config), + {ok, _} = emqx:remove_config(Path), + meck:unload(), + ok; +t_no_buffer_workers(Config) -> + Path = ?config(path, Config), + ConnConfig = connector_config(), + ?assertMatch({ok, _}, emqx:update_config(Path, ConnConfig)), + ?assertEqual([], supervisor:which_children(emqx_resource_buffer_worker_sup)), + ok. + %% helpers connector_config() -> diff --git a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl index 9dca42868..105788cc1 100644 --- a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl +++ b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl @@ -17,6 +17,7 @@ -module(emqx_connector_dummy_impl). -export([ + query_mode/1, callback_mode/0, on_start/2, on_stop/2, @@ -24,6 +25,7 @@ on_get_channel_status/3 ]). +query_mode(_) -> error(unexpected). callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected). diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 7a8fdedef..faf96ac9b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -101,7 +101,10 @@ max_buffer_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), - inflight_window => pos_integer() + inflight_window => pos_integer(), + %% Only for `emqx_resource_manager' usage. If false, prevents spawning buffer + %% workers, regardless of resource query mode. + spawn_buffer_workers => boolean() }. -type query_result() :: ok diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 67f22faee..e19494620 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -136,16 +136,9 @@ create(ResId, Group, ResourceType, Config, Opts) -> % Create metrics for the resource ok = emqx_resource:create_metrics(ResId), QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), - case QueryMode of - %% the resource has built-in buffer, so there is no need for resource workers - simple_sync_internal_buffer -> - ok; - simple_async_internal_buffer -> - ok; - %% The resource is a consumer resource, so there is no need for resource workers - no_queries -> - ok; - _ -> + SpawnBufferWorkers = maps:get(spawn_buffer_workers, Opts, true), + case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of + true -> %% start resource workers as the query type requires them ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of @@ -153,7 +146,9 @@ create(ResId, Group, ResourceType, Config, Opts) -> wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); false -> ok - end + end; + false -> + ok end. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.