fix(connector): don't start buffer workers for the connector itself

Fixes https://emqx.atlassian.net/browse/EMQX-11448
This commit is contained in:
Thales Macedo Garitezi 2023-12-01 15:10:00 -03:00
parent 3d2e95fe3e
commit 29ae45c39d
5 changed files with 62 additions and 18 deletions

View File

@ -382,9 +382,13 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
safe_atom(Atom) when is_atom(Atom) -> Atom.
parse_opts(Conf, Opts0) ->
override_start_after_created(Conf, Opts0).
Opts1 = override_start_after_created(Conf, Opts0),
set_no_buffer_workers(Opts1).
override_start_after_created(Config, Opts) ->
Enabled = maps:get(enable, Config, true),
StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),
Opts#{start_after_created => StartAfterCreated}.
set_no_buffer_workers(Opts) ->
Opts#{spawn_buffer_workers => false}.

View File

@ -163,11 +163,11 @@ t_remove_fail({'init', Config}) ->
meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
[{mocked_mods, [?CONNECTOR, emqx_connector_ee_schema]} | Config];
t_remove_fail({'end', Config}) ->
MockedMods = ?config(mocked_mods, Config),
meck:unload(MockedMods),
meck:expect(?CONNECTOR, query_mode, 1, simple_async_internal_buffer),
Config;
t_remove_fail({'end', _Config}) ->
meck:unload(),
ok;
t_remove_fail(_Config) ->
?assertEqual(
[],
@ -200,7 +200,20 @@ t_remove_fail(_Config) ->
{_, {?CONNECTOR, on_add_channel, _}, {ok, connector_state}},
{_, {?CONNECTOR, on_get_channels, [_]}, _}
],
meck:history(?CONNECTOR)
lists:filter(
fun({_, {?CONNECTOR, Fun, _Args}, _}) ->
lists:member(
Fun, [
callback_mode,
on_start,
on_get_channels,
on_get_status,
on_add_channel
]
)
end,
meck:history(?CONNECTOR)
)
),
ok.
@ -269,6 +282,33 @@ t_create_with_bad_name_root_path(_Config) ->
),
ok.
t_no_buffer_workers({'init', Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, []),
meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
meck:expect(?CONNECTOR, query_mode, 1, sync),
[
{path, [connectors, kafka_producer, no_bws]}
| Config
];
t_no_buffer_workers({'end', Config}) ->
Path = ?config(path, Config),
{ok, _} = emqx:remove_config(Path),
meck:unload(),
ok;
t_no_buffer_workers(Config) ->
Path = ?config(path, Config),
ConnConfig = connector_config(),
?assertMatch({ok, _}, emqx:update_config(Path, ConnConfig)),
?assertEqual([], supervisor:which_children(emqx_resource_buffer_worker_sup)),
ok.
%% helpers
connector_config() ->

View File

@ -17,6 +17,7 @@
-module(emqx_connector_dummy_impl).
-export([
query_mode/1,
callback_mode/0,
on_start/2,
on_stop/2,
@ -24,6 +25,7 @@
on_get_channel_status/3
]).
query_mode(_) -> error(unexpected).
callback_mode() -> error(unexpected).
on_start(_, _) -> error(unexpected).
on_stop(_, _) -> error(unexpected).

View File

@ -101,7 +101,10 @@
max_buffer_bytes => pos_integer(),
query_mode => query_mode(),
resume_interval => pos_integer(),
inflight_window => pos_integer()
inflight_window => pos_integer(),
%% Only for `emqx_resource_manager' usage. If false, prevents spawning buffer
%% workers, regardless of resource query mode.
spawn_buffer_workers => boolean()
}.
-type query_result() ::
ok

View File

@ -136,16 +136,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
% Create metrics for the resource
ok = emqx_resource:create_metrics(ResId),
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
case QueryMode of
%% the resource has built-in buffer, so there is no need for resource workers
simple_sync_internal_buffer ->
ok;
simple_async_internal_buffer ->
ok;
%% The resource is a consumer resource, so there is no need for resource workers
no_queries ->
ok;
_ ->
SpawnBufferWorkers = maps:get(spawn_buffer_workers, Opts, true),
case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of
true ->
%% start resource workers as the query type requires them
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
@ -153,7 +146,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
false ->
ok
end
end;
false ->
ok
end.
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.