From 27aff47c175e1f98d18f5d8c29fc6cfe5e7390f5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 27 Oct 2023 08:23:28 -0300 Subject: [PATCH] fix(connector): check that there are no active channels when removing --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 6 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 70 +++++++++++++++++++ apps/emqx_connector/src/emqx_connector.erl | 56 +++++++++++---- 3 files changed, 119 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 9e03a7d80..43e097ae3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -124,10 +124,14 @@ lookup(Type, Name) -> ?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector ), %% The connector should always exist + %% ... but, in theory, there might be no channels associated to it when we try + %% to delete the connector, and then this reference will become dangling... InstanceData = case emqx_resource:get_instance(ConnectorId) of {ok, _, Data} -> - Data + Data; + {error, not_found} -> + undefined end, {ok, #{ type => Type, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 71e9fe65c..45589909e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -513,3 +513,73 @@ t_update_connector_not_found(_Config) -> emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) ), ok. + +t_remove_single_connector_being_referenced_with_active_channels(_Config) -> + %% we test the connector post config update here because we also need bridges. + Conf = bridge_config(), + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + ?assertMatch( + {error, {post_config_update, _HandlerMod, {active_channels, [_ | _]}}}, + emqx_connector:remove(con_type(), con_name()) + ), + ok. + +t_remove_single_connector_being_referenced_without_active_channels(_Config) -> + %% we test the connector post config update here because we also need bridges. + Conf = bridge_config(), + BridgeName = my_test_bridge, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + emqx_common_test_helpers:with_mock( + emqx_bridge_v2_test_connector, + on_get_channels, + fun(_ResId) -> [] end, + fun() -> + ?assertMatch({ok, _}, emqx_connector:remove(con_type(), con_name())), + %% we no longer have connector data if this happens... + ?assertMatch( + {ok, #{resource_data := undefined}}, + emqx_bridge_v2:lookup(bridge_type(), BridgeName) + ), + ok + end + ), + ok. + +t_remove_multiple_connectors_being_referenced_with_channels(_Config) -> + Conf = bridge_config(), + BridgeName = my_test_bridge, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + reason := "connector_has_active_channels", + type := _, + connector_name := _, + active_channels := [_ | _] + }}}, + emqx_conf:update([connectors], #{}, #{override_to => cluster}) + ), + ok. + +t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> + Conf = bridge_config(), + BridgeName = my_test_bridge, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + emqx_common_test_helpers:with_mock( + emqx_bridge_v2_test_connector, + on_get_channels, + fun(_ResId) -> [] end, + fun() -> + ?assertMatch( + {ok, _}, + emqx_conf:update([connectors], #{}, #{override_to => cluster}) + ), + %% we no longer have connector data if this happens... + ?assertMatch( + {ok, #{resource_data := undefined}}, + emqx_bridge_v2:lookup(bridge_type(), BridgeName) + ), + ok + end + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 996c13bbf..bd46919f5 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -128,18 +128,23 @@ operation_to_enable(enable) -> true. post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_connector_changes` failed. - Result = perform_connector_changes([ - #{action => fun emqx_connector_resource:remove/4, data => Removed}, - #{ - action => fun emqx_connector_resource:create/3, - data => Added, - on_exception_fn => fun emqx_connector_resource:remove/4 - }, - #{action => fun emqx_connector_resource:update/4, data => Updated} - ]), - ?tp(connector_post_config_update_done, #{}), - Result; + case ensure_no_channels(Removed) of + ok -> + %% The config update will be failed if any task in `perform_connector_changes` failed. + Result = perform_connector_changes([ + #{action => fun emqx_connector_resource:remove/4, data => Removed}, + #{ + action => fun emqx_connector_resource:create/3, + data => Added, + on_exception_fn => fun emqx_connector_resource:remove/4 + }, + #{action => fun emqx_connector_resource:update/4, data => Updated} + ]), + ?tp(connector_post_config_update_done, #{}), + Result; + {error, Error} -> + {error, Error} + end; post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> case emqx_connector_resource:get_channels(Type, Name) of {ok, []} -> @@ -419,3 +424,30 @@ get_basic_usage_info() -> _:_ -> InitialAcc end. + +ensure_no_channels(Configs) -> + Pipeline = + lists:map( + fun({Type, ConnectorName}) -> + fun(_) -> + case emqx_connector_resource:get_channels(Type, ConnectorName) of + {ok, []} -> + ok; + {ok, Channels} -> + {error, #{ + reason => "connector_has_active_channels", + type => Type, + connector_name => ConnectorName, + active_channels => Channels + }} + end + end + end, + maps:keys(Configs) + ), + case emqx_utils:pipeline(Pipeline, unused, unused) of + {ok, _, _} -> + ok; + {error, Reason, _State} -> + {error, Reason} + end.