diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index e33e1ca07..56b2cb4ed 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -987,14 +987,45 @@ call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType, ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) end. -is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) -> - try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of +is_enabled_bridge(ConfRootKey, ActionOrSourceType, BridgeName) -> + try + emqx_bridge_v2:lookup(ConfRootKey, ActionOrSourceType, binary_to_existing_atom(BridgeName)) + of {ok, #{raw_config := ConfMap}} -> - maps:get(<<"enable">>, ConfMap, true); + maps:get(<<"enable">>, ConfMap, true) andalso + is_connector_enabled( + ActionOrSourceType, + maps:get(<<"connector">>, ConfMap) + ); {error, not_found} -> throw(not_found) catch error:badarg -> + %% catch non-existing atom, + %% none-existing atom means it is not available in config PT storage. + throw(not_found); + error:{badkey, _} -> + %% `connector' field not present. Should never happen if action/source schema + %% is properly defined. + throw(not_found) + end. + +is_connector_enabled(ActionOrSourceType, ConnectorName0) -> + try + ConnectorType = emqx_bridge_v2:connector_type(ActionOrSourceType), + ConnectorName = to_existing_atom(ConnectorName0), + case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of + undefined -> + throw(not_found); + Config = #{} -> + maps:get(enable, Config, true) + end + catch + throw:badarg -> + %% catch non-existing atom, + %% none-existing atom means it is not available in config PT storage. + throw(not_found); + throw:bad_atom -> %% catch non-existing atom, %% none-existing atom means it is not available in config PT storage. throw(not_found) @@ -1407,3 +1438,9 @@ map_to_json(M0) -> M2 = maps:without([value, <<"value">>], M1), emqx_utils_json:encode(M2) end. + +to_existing_atom(X) -> + case emqx_utils:safe_to_existing_atom(X, utf8) of + {ok, A} -> A; + {error, _} -> throw(bad_atom) + end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index b1e1ac38d..039402738 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -109,6 +109,7 @@ -define(SOURCE_TYPE_STR, "mqtt"). -define(SOURCE_TYPE, <>). +-define(SOURCE_CONNECTOR_TYPE, ?SOURCE_TYPE). -define(APPSPECS, [ emqx_conf, @@ -166,9 +167,19 @@ init_per_group(single = Group, Config) -> Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]); init_per_group(actions, Config) -> - [{bridge_kind, action} | Config]; + [ + {bridge_kind, action}, + {connector_type, ?ACTION_CONNECTOR_TYPE}, + {connector_name, ?ACTION_CONNECTOR_NAME} + | Config + ]; init_per_group(sources, Config) -> - [{bridge_kind, source} | Config]; + [ + {bridge_kind, source}, + {connector_type, ?SOURCE_CONNECTOR_TYPE}, + {connector_name, ?SOURCE_CONNECTOR_NAME} + | Config + ]; init_per_group(_Group, Config) -> Config. @@ -202,14 +213,45 @@ end_per_group(single, Config) -> end_per_group(_Group, _Config) -> ok. -init_per_testcase(t_action_types, Config) -> +init_per_testcase(TestCase, Config) when + TestCase =:= t_start_action_or_source_with_disabled_connector; + TestCase =:= t_action_types +-> case ?config(cluster_nodes, Config) of undefined -> init_mocks(); Nodes -> [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] end, - Config; + #{ + connector_config := ConnectorConfig, + bridge_type := BridgeType, + bridge_name := BridgeName, + bridge_config := BridgeConfig + } = + case ?config(bridge_kind, Config) of + action -> + #{ + connector_config => ?ACTIONS_CONNECTOR, + bridge_type => {action_type, ?ACTION_TYPE}, + bridge_name => {action_name, ?ACTION_CONNECTOR_NAME}, + bridge_config => {action_config, ?KAFKA_BRIDGE(?ACTION_CONNECTOR_NAME)} + }; + source -> + #{ + connector_config => source_connector_create_config(#{}), + bridge_type => {source_type, ?SOURCE_TYPE}, + bridge_name => {source_name, ?SOURCE_CONNECTOR_NAME}, + bridge_config => {source_config, source_config_base()} + } + end, + [ + {connector_config, ConnectorConfig}, + BridgeType, + BridgeName, + BridgeConfig + | Config + ]; init_per_testcase(_TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> @@ -434,7 +476,7 @@ source_connector_create_config(Overrides0) -> source_connector_config_base(), #{ <<"enable">> => true, - <<"type">> => ?SOURCE_TYPE, + <<"type">> => ?SOURCE_CONNECTOR_TYPE, <<"name">> => ?SOURCE_CONNECTOR_NAME } ), @@ -1547,3 +1589,12 @@ t_older_version_nodes_in_cluster(Config) -> ), ok. + +t_start_action_or_source_with_disabled_connector(matrix) -> + [ + [single, actions], + [single, sources] + ]; +t_start_action_or_source_with_disabled_connector(Config) -> + ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), + ok. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index b057a648e..82858f00b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -383,6 +383,25 @@ start_connector_api(ConnectorName, ConnectorType) -> ct:pal("connector update (http) result:\n ~p", [Res]), Res. +enable_connector_api(ConnectorType, ConnectorName) -> + do_enable_disable_connector_api(ConnectorType, ConnectorName, enable). + +disable_connector_api(ConnectorType, ConnectorName) -> + do_enable_disable_connector_api(ConnectorType, ConnectorName, disable). + +do_enable_disable_connector_api(ConnectorType, ConnectorName, Op) -> + ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName), + {OpPath, OpStr} = + case Op of + enable -> {"true", "enable"}; + disable -> {"false", "disable"} + end, + Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "enable", OpPath]), + ct:pal(OpStr ++ " connector ~s (http)", [ConnectorId]), + Res = request(put, Path, []), + ct:pal(OpStr ++ " connector ~s (http) result:\n ~p", [ConnectorId, Res]), + Res. + get_connector_api(ConnectorType, ConnectorName) -> ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName), Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]), @@ -956,3 +975,27 @@ t_on_get_status(Config, Opts) -> ) end, ok. + +%% Verifies that attempting to start an action while its connnector is disabled does not +%% start the connector. +t_start_action_or_source_with_disabled_connector(Config) -> + #{ + kind := Kind, + type := Type, + name := Name, + connector_type := ConnectorType, + connector_name := ConnectorName + } = get_common_values(Config), + ?check_trace( + begin + {ok, _} = create_bridge_api(Config), + {ok, {{_, 204, _}, _, _}} = disable_connector_api(ConnectorType, ConnectorName), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + op_bridge_api(Kind, "start", Type, Name) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl index 561783760..9dff5ab22 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -228,3 +228,7 @@ t_sync_query(Config) -> postgres_bridge_connector_on_query_return ), ok. + +t_start_action_or_source_with_disabled_connector(Config) -> + ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), + ok. diff --git a/changes/ce/fix-13090.en.md b/changes/ce/fix-13090.en.md new file mode 100644 index 000000000..ee3593036 --- /dev/null +++ b/changes/ce/fix-13090.en.md @@ -0,0 +1 @@ +Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.