fix(bridge v2 api): don't attempt to start disabled connector when starting action/source

Fixes https://emqx.atlassian.net/browse/EMQX-12435
This commit is contained in:
Thales Macedo Garitezi 2024-05-22 10:14:55 -03:00
parent d4acceb858
commit 4094032649
5 changed files with 144 additions and 8 deletions

View File

@ -987,14 +987,45 @@ call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType,
?BRIDGE_NOT_FOUND(BridgeType, BridgeName) ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end. end.
is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) -> is_enabled_bridge(ConfRootKey, ActionOrSourceType, BridgeName) ->
try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of try
emqx_bridge_v2:lookup(ConfRootKey, ActionOrSourceType, binary_to_existing_atom(BridgeName))
of
{ok, #{raw_config := ConfMap}} -> {ok, #{raw_config := ConfMap}} ->
maps:get(<<"enable">>, ConfMap, true); maps:get(<<"enable">>, ConfMap, true) andalso
is_connector_enabled(
ActionOrSourceType,
maps:get(<<"connector">>, ConfMap)
);
{error, not_found} -> {error, not_found} ->
throw(not_found) throw(not_found)
catch catch
error:badarg -> error:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found);
error:{badkey, _} ->
%% `connector' field not present. Should never happen if action/source schema
%% is properly defined.
throw(not_found)
end.
is_connector_enabled(ActionOrSourceType, ConnectorName0) ->
try
ConnectorType = emqx_bridge_v2:connector_type(ActionOrSourceType),
ConnectorName = to_existing_atom(ConnectorName0),
case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
undefined ->
throw(not_found);
Config = #{} ->
maps:get(enable, Config, true)
end
catch
throw:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found);
throw:bad_atom ->
%% catch non-existing atom, %% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage. %% none-existing atom means it is not available in config PT storage.
throw(not_found) throw(not_found)
@ -1407,3 +1438,9 @@ map_to_json(M0) ->
M2 = maps:without([value, <<"value">>], M1), M2 = maps:without([value, <<"value">>], M1),
emqx_utils_json:encode(M2) emqx_utils_json:encode(M2)
end. end.
to_existing_atom(X) ->
case emqx_utils:safe_to_existing_atom(X, utf8) of
{ok, A} -> A;
{error, _} -> throw(bad_atom)
end.

View File

@ -109,6 +109,7 @@
-define(SOURCE_TYPE_STR, "mqtt"). -define(SOURCE_TYPE_STR, "mqtt").
-define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>). -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
-define(SOURCE_CONNECTOR_TYPE, ?SOURCE_TYPE).
-define(APPSPECS, [ -define(APPSPECS, [
emqx_conf, emqx_conf,
@ -166,9 +167,19 @@ init_per_group(single = Group, Config) ->
Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]); init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
init_per_group(actions, Config) -> init_per_group(actions, Config) ->
[{bridge_kind, action} | Config]; [
{bridge_kind, action},
{connector_type, ?ACTION_CONNECTOR_TYPE},
{connector_name, ?ACTION_CONNECTOR_NAME}
| Config
];
init_per_group(sources, Config) -> init_per_group(sources, Config) ->
[{bridge_kind, source} | Config]; [
{bridge_kind, source},
{connector_type, ?SOURCE_CONNECTOR_TYPE},
{connector_name, ?SOURCE_CONNECTOR_NAME}
| Config
];
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
@ -202,14 +213,45 @@ end_per_group(single, Config) ->
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(t_action_types, Config) -> init_per_testcase(TestCase, Config) when
TestCase =:= t_start_action_or_source_with_disabled_connector;
TestCase =:= t_action_types
->
case ?config(cluster_nodes, Config) of case ?config(cluster_nodes, Config) of
undefined -> undefined ->
init_mocks(); init_mocks();
Nodes -> Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
end, end,
Config; #{
connector_config := ConnectorConfig,
bridge_type := BridgeType,
bridge_name := BridgeName,
bridge_config := BridgeConfig
} =
case ?config(bridge_kind, Config) of
action ->
#{
connector_config => ?ACTIONS_CONNECTOR,
bridge_type => {action_type, ?ACTION_TYPE},
bridge_name => {action_name, ?ACTION_CONNECTOR_NAME},
bridge_config => {action_config, ?KAFKA_BRIDGE(?ACTION_CONNECTOR_NAME)}
};
source ->
#{
connector_config => source_connector_create_config(#{}),
bridge_type => {source_type, ?SOURCE_TYPE},
bridge_name => {source_name, ?SOURCE_CONNECTOR_NAME},
bridge_config => {source_config, source_config_base()}
}
end,
[
{connector_config, ConnectorConfig},
BridgeType,
BridgeName,
BridgeConfig
| Config
];
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
case ?config(cluster_nodes, Config) of case ?config(cluster_nodes, Config) of
undefined -> undefined ->
@ -434,7 +476,7 @@ source_connector_create_config(Overrides0) ->
source_connector_config_base(), source_connector_config_base(),
#{ #{
<<"enable">> => true, <<"enable">> => true,
<<"type">> => ?SOURCE_TYPE, <<"type">> => ?SOURCE_CONNECTOR_TYPE,
<<"name">> => ?SOURCE_CONNECTOR_NAME <<"name">> => ?SOURCE_CONNECTOR_NAME
} }
), ),
@ -1547,3 +1589,12 @@ t_older_version_nodes_in_cluster(Config) ->
), ),
ok. ok.
t_start_action_or_source_with_disabled_connector(matrix) ->
[
[single, actions],
[single, sources]
];
t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok.

View File

@ -383,6 +383,25 @@ start_connector_api(ConnectorName, ConnectorType) ->
ct:pal("connector update (http) result:\n ~p", [Res]), ct:pal("connector update (http) result:\n ~p", [Res]),
Res. Res.
enable_connector_api(ConnectorType, ConnectorName) ->
do_enable_disable_connector_api(ConnectorType, ConnectorName, enable).
disable_connector_api(ConnectorType, ConnectorName) ->
do_enable_disable_connector_api(ConnectorType, ConnectorName, disable).
do_enable_disable_connector_api(ConnectorType, ConnectorName, Op) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
{OpPath, OpStr} =
case Op of
enable -> {"true", "enable"};
disable -> {"false", "disable"}
end,
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "enable", OpPath]),
ct:pal(OpStr ++ " connector ~s (http)", [ConnectorId]),
Res = request(put, Path, []),
ct:pal(OpStr ++ " connector ~s (http) result:\n ~p", [ConnectorId, Res]),
Res.
get_connector_api(ConnectorType, ConnectorName) -> get_connector_api(ConnectorType, ConnectorName) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName), ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]), Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
@ -956,3 +975,27 @@ t_on_get_status(Config, Opts) ->
) )
end, end,
ok. ok.
%% Verifies that attempting to start an action while its connnector is disabled does not
%% start the connector.
t_start_action_or_source_with_disabled_connector(Config) ->
#{
kind := Kind,
type := Type,
name := Name,
connector_type := ConnectorType,
connector_name := ConnectorName
} = get_common_values(Config),
?check_trace(
begin
{ok, _} = create_bridge_api(Config),
{ok, {{_, 204, _}, _, _}} = disable_connector_api(ConnectorType, ConnectorName),
?assertMatch(
{error, {{_, 400, _}, _, _}},
op_bridge_api(Kind, "start", Type, Name)
),
ok
end,
[]
),
ok.

View File

@ -228,3 +228,7 @@ t_sync_query(Config) ->
postgres_bridge_connector_on_query_return postgres_bridge_connector_on_query_return
), ),
ok. ok.
t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok.

View File

@ -0,0 +1 @@
Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.