From 1f1d9e58c6bd3da70c95c82da282715fca027e0f Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 20 Nov 2023 16:23:46 +0100 Subject: [PATCH] fix(emqx_connector): don't crash in API on delete with active channels --- .../emqx_connector/src/emqx_connector_api.erl | 2 +- .../test/emqx_connector_api_SUITE.erl | 143 ++++++++++++++++-- 2 files changed, 130 insertions(+), 15 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index b2267539b..f6e0c0f95 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -372,7 +372,7 @@ schema("/connectors_probe") -> case emqx_connector:remove(ConnectorType, ConnectorName) of ok -> ?NO_CONTENT; - {error, {active_channels, Channels}} -> + {error, {post_config_update, _HandlerMod, {active_channels, Channels}}} -> ?BAD_REQUEST( {<<"Cannot delete connector while there are active channels defined for this connector">>, Channels} diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f6609808f..bd8aa9ddf 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -25,7 +25,7 @@ -include_lib("snabbkaffe/include/test_macros.hrl"). -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). --define(CONNECTOR(NAME, TYPE), #{ +-define(RESOURCE(NAME, TYPE), #{ %<<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, <<"name">> => NAME @@ -52,12 +52,57 @@ -define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)). -define(KAFKA_CONNECTOR(Name, BootstrapHosts), maps:merge( - ?CONNECTOR(Name, ?CONNECTOR_TYPE), + ?RESOURCE(Name, ?CONNECTOR_TYPE), ?KAFKA_CONNECTOR_BASE(BootstrapHosts) ) ). -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). +-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). +-define(BRIDGE_TYPE_STR, "kafka_producer"). +-define(BRIDGE_TYPE, <>). +-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{ + <<"enable">> => true, + <<"connector">> => Connector, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"hybrid">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_ext_headers">> => [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"${clientid}">> + }, + #{ + <<"kafka_ext_header_key">> => <<"topic">>, + <<"kafka_ext_header_value">> => <<"${topic}">> + } + ], + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"kafka_headers">> => <<"${pub_props}">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"required_acks">> => <<"all_isr">>, + <<"topic">> => <<"kafka-topic">> + }, + <<"local_topic">> => <<"mqtt/local/topic">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"32s">> + } +}). +-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). + %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ %% <<"server">> => SERVER, @@ -105,7 +150,8 @@ emqx, emqx_auth, emqx_management, - {emqx_connector, "connectors {}"} + {emqx_connector, "connectors {}"}, + {emqx_bridge, "actions {}"} ]). -define(APPSPEC_DASHBOARD, @@ -128,7 +174,8 @@ all() -> groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_connectors_probe + t_connectors_probe, + t_fail_delete_with_action ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -187,29 +234,38 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. -init_per_testcase(_TestCase, Config) -> +init_per_testcase(TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> - init_mocks(); + init_mocks(TestCase); Nodes -> - [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + [erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes] end, Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(TestCase, Config) -> + Node = ?config(node, Config), + ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]), case ?config(cluster_nodes, Config) of undefined -> meck:unload(); Nodes -> - [erpc:call(Node, meck, unload, []) || Node <- Nodes] + [erpc:call(N, meck, unload, []) || N <- Nodes] end, - Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), - ok = erpc:call(Node, fun clear_resources/0), ok. -define(CONNECTOR_IMPL, dummy_connector_impl). -init_mocks() -> +init_mocks(t_fail_delete_with_action) -> + init_mocks(common), + meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), + ok; +init_mocks(_TestCase) -> meck:new(emqx_connector_ee_schema, [passthrough, no_link]), meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), @@ -235,7 +291,15 @@ init_mocks() -> ), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. -clear_resources() -> +clear_resources(t_fail_delete_with_action) -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + ok = emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + clear_resources(common); +clear_resources(_) -> lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_connector:remove(Type, Name) @@ -646,7 +710,7 @@ t_connectors_probe(Config) -> request_json( post, uri(["connectors_probe"]), - ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>), + ?RESOURCE(<<"broken_connector">>, <<"unknown_type">>), Config ) ), @@ -674,6 +738,57 @@ t_create_with_bad_name(Config) -> ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), ok. +t_fail_delete_with_action(Config) -> + Name = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := Name, + <<"kafka">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri(["actions"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% delete the connector + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + <<"{<<\"Cannot delete connector while there are active channels", + " defined for this connector\">>,", _/binary>> + }}, + request_json(delete, uri(["connectors", ConnectorID]), Config) + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],