fix(emqx_connector): don't crash in API on delete with active channels

This commit is contained in:
Stefan Strigler 2023-11-20 16:23:46 +01:00
parent b42d4e5ab0
commit 1f1d9e58c6
2 changed files with 130 additions and 15 deletions

View File

@ -372,7 +372,7 @@ schema("/connectors_probe") ->
case emqx_connector:remove(ConnectorType, ConnectorName) of
ok ->
?NO_CONTENT;
{error, {active_channels, Channels}} ->
{error, {post_config_update, _HandlerMod, {active_channels, Channels}}} ->
?BAD_REQUEST(
{<<"Cannot delete connector while there are active channels defined for this connector">>,
Channels}

View File

@ -25,7 +25,7 @@
-include_lib("snabbkaffe/include/test_macros.hrl").
-define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(CONNECTOR(NAME, TYPE), #{
-define(RESOURCE(NAME, TYPE), #{
%<<"ssl">> => #{<<"enable">> => false},
<<"type">> => TYPE,
<<"name">> => NAME
@ -52,12 +52,57 @@
-define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)).
-define(KAFKA_CONNECTOR(Name, BootstrapHosts),
maps:merge(
?CONNECTOR(Name, ?CONNECTOR_TYPE),
?RESOURCE(Name, ?CONNECTOR_TYPE),
?KAFKA_CONNECTOR_BASE(BootstrapHosts)
)
).
-define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(BRIDGE_TYPE_STR, "kafka_producer").
-define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{
<<"enable">> => true,
<<"connector">> => Connector,
<<"kafka">> => #{
<<"buffer">> => #{
<<"memory_overload_protection">> => true,
<<"mode">> => <<"hybrid">>,
<<"per_partition_limit">> => <<"2GB">>,
<<"segment_bytes">> => <<"100MB">>
},
<<"compression">> => <<"no_compression">>,
<<"kafka_ext_headers">> => [
#{
<<"kafka_ext_header_key">> => <<"clientid">>,
<<"kafka_ext_header_value">> => <<"${clientid}">>
},
#{
<<"kafka_ext_header_key">> => <<"topic">>,
<<"kafka_ext_header_value">> => <<"${topic}">>
}
],
<<"kafka_header_value_encode_mode">> => <<"none">>,
<<"kafka_headers">> => <<"${pub_props}">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_inflight">> => 10,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"timestamp">> => <<"${.timestamp}">>,
<<"value">> => <<"${.}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,
<<"partition_strategy">> => <<"random">>,
<<"required_acks">> => <<"all_isr">>,
<<"topic">> => <<"kafka-topic">>
},
<<"local_topic">> => <<"mqtt/local/topic">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"32s">>
}
}).
-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
%% <<"server">> => SERVER,
@ -105,7 +150,8 @@
emqx,
emqx_auth,
emqx_management,
{emqx_connector, "connectors {}"}
{emqx_connector, "connectors {}"},
{emqx_bridge, "actions {}"}
]).
-define(APPSPEC_DASHBOARD,
@ -128,7 +174,8 @@ all() ->
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [
t_connectors_probe
t_connectors_probe,
t_fail_delete_with_action
],
ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics
@ -187,29 +234,38 @@ end_per_group(_, Config) ->
emqx_cth_suite:stop(?config(group_apps, Config)),
ok.
init_per_testcase(_TestCase, Config) ->
init_per_testcase(TestCase, Config) ->
case ?config(cluster_nodes, Config) of
undefined ->
init_mocks();
init_mocks(TestCase);
Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
[erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes]
end,
Config.
end_per_testcase(_TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
Node = ?config(node, Config),
ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]),
case ?config(cluster_nodes, Config) of
undefined ->
meck:unload();
Nodes ->
[erpc:call(Node, meck, unload, []) || Node <- Nodes]
[erpc:call(N, meck, unload, []) || N <- Nodes]
end,
Node = ?config(node, Config),
ok = emqx_common_test_helpers:call_janitor(),
ok = erpc:call(Node, fun clear_resources/0),
ok.
-define(CONNECTOR_IMPL, dummy_connector_impl).
init_mocks() ->
init_mocks(t_fail_delete_with_action) ->
init_mocks(common),
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId)
end),
ok;
init_mocks(_TestCase) ->
meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
@ -235,7 +291,15 @@ init_mocks() ->
),
[?CONNECTOR_IMPL, emqx_connector_ee_schema].
clear_resources() ->
clear_resources(t_fail_delete_with_action) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
clear_resources(common);
clear_resources(_) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name)
@ -646,7 +710,7 @@ t_connectors_probe(Config) ->
request_json(
post,
uri(["connectors_probe"]),
?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>),
?RESOURCE(<<"broken_connector">>, <<"unknown_type">>),
Config
)
),
@ -674,6 +738,57 @@ t_create_with_bad_name(Config) ->
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
ok.
t_fail_delete_with_action(Config) ->
Name = ?CONNECTOR_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?CONNECTOR_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _]
}},
request_json(
post,
uri(["connectors"]),
?KAFKA_CONNECTOR(Name),
Config
)
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
BridgeName = ?BRIDGE_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := Name,
<<"kafka">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
request_json(
post,
uri(["actions"]),
?KAFKA_BRIDGE(?BRIDGE_NAME),
Config
)
),
%% delete the connector
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{<<\"Cannot delete connector while there are active channels",
" defined for this connector\">>,", _/binary>>
}},
request_json(delete, uri(["connectors", ConnectorID]), Config)
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],