fix: fail remove connector with active channels

This commit is contained in:
Stefan Strigler 2023-10-03 16:44:00 +02:00 committed by Zaiming (Stone) Shi
parent cc864f4821
commit d05f2010b3
5 changed files with 80 additions and 5 deletions

View File

@ -141,9 +141,14 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
?tp(connector_post_config_update_done, #{}),
Result;
post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ok = emqx_connector_resource:remove(Type, Name),
?tp(connector_post_config_update_done, #{}),
ok;
case emqx_connector_resource:get_channels(Type, Name) of
{ok, []} ->
ok = emqx_connector_resource:remove(Type, Name),
?tp(connector_post_config_update_done, #{}),
ok;
{ok, Channels} ->
{error, {active_channels, Channels}}
end;
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) ->
ok = emqx_connector_resource:create(Type, Name, NewConf),
?tp(connector_post_config_update_done, #{}),

View File

@ -44,7 +44,8 @@
stop/2,
update/2,
update/3,
update/4
update/4,
get_channels/2
]).
-callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) ->
@ -221,6 +222,9 @@ update(Type, Name, {OldConf, Conf}, Opts) ->
ok
end.
get_channels(Type, Name) ->
emqx_resource:get_channels(resource_id(Type, Name)).
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_config([connectors, Type, Name])).

View File

@ -147,6 +147,58 @@ t_connector_lifecycle(_Config) ->
),
ok.
t_remove_fail({'init', Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{}}]),
meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
[{mocked_mods, [?CONNECTOR, emqx_connector_ee_schema]} | Config];
t_remove_fail({'end', Config}) ->
MockedMods = ?config(mocked_mods, Config),
meck:unload(MockedMods),
Config;
t_remove_fail(_Config) ->
?assertEqual(
[],
emqx_connector:list()
),
?assertMatch(
{ok, _},
emqx_connector:create(kafka, my_failing_connector, connector_config())
),
?assertMatch(
{error, {post_config_update, emqx_connector, {active_channels, [{<<"my_channel">>, _}]}}},
emqx_connector:remove(kafka, my_failing_connector)
),
?assertNotEqual(
[],
emqx_connector:list()
),
?assert(meck:validate(?CONNECTOR)),
?assertMatch(
[
{_, {?CONNECTOR, callback_mode, []}, _},
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
{_, {?CONNECTOR, on_get_channels, [_]}, _},
{_, {?CONNECTOR, on_add_channel, _}, {ok, connector_state}},
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
{_, {?CONNECTOR, on_get_channels, [_]}, _}
],
meck:history(?CONNECTOR)
),
ok.
%% helpers
connector_config() ->
#{
<<"authentication">> => <<"none">>,

View File

@ -77,6 +77,7 @@
%% verify if the resource is working normally
health_check/1,
channel_health_check/2,
get_channels/1,
%% set resource status to disconnected
set_resource_status_connecting/1,
%% stop the instance
@ -443,6 +444,10 @@ health_check(ResId) ->
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).
-spec get_channels(resource_id()) -> [{binary(), map()}].
get_channels(ResId) ->
emqx_resource_manager:get_channels(ResId).
set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).

View File

@ -32,7 +32,8 @@
health_check/1,
channel_health_check/2,
add_channel/3,
remove_channel/2
remove_channel/2,
get_channels/1
]).
-export([
@ -296,6 +297,9 @@ add_channel(ResId, ChannelId, Config) ->
remove_channel(ResId, ChannelId) ->
safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION).
get_channels(ResId) ->
safe_call(ResId, get_channels, ?T_OPERATION).
%% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server
@ -426,6 +430,11 @@ handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data
) ->
handle_remove_channel(From, ChannelId, Data);
handle_event(
{call, From}, get_channels, _State, Data
) ->
Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
{keep_state_and_data, {reply, From, {ok, Channels}}};
handle_event(EventType, EventData, State, Data) ->
?SLOG(
error,