diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index e797012a2..554274697 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -141,9 +141,14 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> ?tp(connector_post_config_update_done, #{}), Result; post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> - ok = emqx_connector_resource:remove(Type, Name), - ?tp(connector_post_config_update_done, #{}), - ok; + case emqx_connector_resource:get_channels(Type, Name) of + {ok, []} -> + ok = emqx_connector_resource:remove(Type, Name), + ?tp(connector_post_config_update_done, #{}), + ok; + {ok, Channels} -> + {error, {active_channels, Channels}} + end; post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> ok = emqx_connector_resource:create(Type, Name, NewConf), ?tp(connector_post_config_update_done, #{}), diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 93a22ab53..527225e5a 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -44,7 +44,8 @@ stop/2, update/2, update/3, - update/4 + update/4, + get_channels/2 ]). -callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) -> @@ -221,6 +222,9 @@ update(Type, Name, {OldConf, Conf}, Opts) -> ok end. +get_channels(Type, Name) -> + emqx_resource:get_channels(resource_id(Type, Name)). + recreate(Type, Name) -> recreate(Type, Name, emqx:get_config([connectors, Type, Name])). diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 7980402bc..49b6641bd 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -147,6 +147,58 @@ t_connector_lifecycle(_Config) -> ), ok. +t_remove_fail({'init', Config}) -> + meck:new(emqx_connector_ee_schema, [passthrough]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR), + meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), + meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{}}]), + meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR, on_stop, 2, ok), + meck:expect(?CONNECTOR, on_get_status, 2, connected), + [{mocked_mods, [?CONNECTOR, emqx_connector_ee_schema]} | Config]; +t_remove_fail({'end', Config}) -> + MockedMods = ?config(mocked_mods, Config), + meck:unload(MockedMods), + Config; +t_remove_fail(_Config) -> + ?assertEqual( + [], + emqx_connector:list() + ), + + ?assertMatch( + {ok, _}, + emqx_connector:create(kafka, my_failing_connector, connector_config()) + ), + + ?assertMatch( + {error, {post_config_update, emqx_connector, {active_channels, [{<<"my_channel">>, _}]}}}, + emqx_connector:remove(kafka, my_failing_connector) + ), + + ?assertNotEqual( + [], + emqx_connector:list() + ), + + ?assert(meck:validate(?CONNECTOR)), + ?assertMatch( + [ + {_, {?CONNECTOR, callback_mode, []}, _}, + {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}}, + {_, {?CONNECTOR, on_get_channels, [_]}, _}, + {_, {?CONNECTOR, on_add_channel, _}, {ok, connector_state}}, + {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected}, + {_, {?CONNECTOR, on_get_channels, [_]}, _} + ], + meck:history(?CONNECTOR) + ), + ok. + +%% helpers + connector_config() -> #{ <<"authentication">> => <<"none">>, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d79c8d966..cf8364e97 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -77,6 +77,7 @@ %% verify if the resource is working normally health_check/1, channel_health_check/2, + get_channels/1, %% set resource status to disconnected set_resource_status_connecting/1, %% stop the instance @@ -443,6 +444,10 @@ health_check(ResId) -> channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). +-spec get_channels(resource_id()) -> [{binary(), map()}]. +get_channels(ResId) -> + emqx_resource_manager:get_channels(ResId). + set_resource_status_connecting(ResId) -> emqx_resource_manager:set_resource_status_connecting(ResId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index b3021ae70..8f37bfa1a 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -32,7 +32,8 @@ health_check/1, channel_health_check/2, add_channel/3, - remove_channel/2 + remove_channel/2, + get_channels/1 ]). -export([ @@ -296,6 +297,9 @@ add_channel(ResId, ChannelId, Config) -> remove_channel(ResId, ChannelId) -> safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION). +get_channels(ResId) -> + safe_call(ResId, get_channels, ?T_OPERATION). + %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server @@ -426,6 +430,11 @@ handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> handle_remove_channel(From, ChannelId, Data); +handle_event( + {call, From}, get_channels, _State, Data +) -> + Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), + {keep_state_and_data, {reply, From, {ok, Channels}}}; handle_event(EventType, EventData, State, Data) -> ?SLOG( error,