fix(connector): check that there are no active channels when removing
This commit is contained in:
parent
5f17a8f2ce
commit
27aff47c17
|
@ -124,10 +124,14 @@ lookup(Type, Name) ->
|
|||
?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector
|
||||
),
|
||||
%% The connector should always exist
|
||||
%% ... but, in theory, there might be no channels associated to it when we try
|
||||
%% to delete the connector, and then this reference will become dangling...
|
||||
InstanceData =
|
||||
case emqx_resource:get_instance(ConnectorId) of
|
||||
{ok, _, Data} ->
|
||||
Data
|
||||
Data;
|
||||
{error, not_found} ->
|
||||
undefined
|
||||
end,
|
||||
{ok, #{
|
||||
type => Type,
|
||||
|
|
|
@ -513,3 +513,73 @@ t_update_connector_not_found(_Config) ->
|
|||
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
|
||||
%% we test the connector post config update here because we also need bridges.
|
||||
Conf = bridge_config(),
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
|
||||
?assertMatch(
|
||||
{error, {post_config_update, _HandlerMod, {active_channels, [_ | _]}}},
|
||||
emqx_connector:remove(con_type(), con_name())
|
||||
),
|
||||
ok.
|
||||
|
||||
t_remove_single_connector_being_referenced_without_active_channels(_Config) ->
|
||||
%% we test the connector post config update here because we also need bridges.
|
||||
Conf = bridge_config(),
|
||||
BridgeName = my_test_bridge,
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
||||
emqx_common_test_helpers:with_mock(
|
||||
emqx_bridge_v2_test_connector,
|
||||
on_get_channels,
|
||||
fun(_ResId) -> [] end,
|
||||
fun() ->
|
||||
?assertMatch({ok, _}, emqx_connector:remove(con_type(), con_name())),
|
||||
%% we no longer have connector data if this happens...
|
||||
?assertMatch(
|
||||
{ok, #{resource_data := undefined}},
|
||||
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_remove_multiple_connectors_being_referenced_with_channels(_Config) ->
|
||||
Conf = bridge_config(),
|
||||
BridgeName = my_test_bridge,
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
reason := "connector_has_active_channels",
|
||||
type := _,
|
||||
connector_name := _,
|
||||
active_channels := [_ | _]
|
||||
}}},
|
||||
emqx_conf:update([connectors], #{}, #{override_to => cluster})
|
||||
),
|
||||
ok.
|
||||
|
||||
t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
|
||||
Conf = bridge_config(),
|
||||
BridgeName = my_test_bridge,
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
||||
emqx_common_test_helpers:with_mock(
|
||||
emqx_bridge_v2_test_connector,
|
||||
on_get_channels,
|
||||
fun(_ResId) -> [] end,
|
||||
fun() ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update([connectors], #{}, #{override_to => cluster})
|
||||
),
|
||||
%% we no longer have connector data if this happens...
|
||||
?assertMatch(
|
||||
{ok, #{resource_data := undefined}},
|
||||
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -128,18 +128,23 @@ operation_to_enable(enable) -> true.
|
|||
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
||||
#{added := Added, removed := Removed, changed := Updated} =
|
||||
diff_confs(NewConf, OldConf),
|
||||
%% The config update will be failed if any task in `perform_connector_changes` failed.
|
||||
Result = perform_connector_changes([
|
||||
#{action => fun emqx_connector_resource:remove/4, data => Removed},
|
||||
#{
|
||||
action => fun emqx_connector_resource:create/3,
|
||||
data => Added,
|
||||
on_exception_fn => fun emqx_connector_resource:remove/4
|
||||
},
|
||||
#{action => fun emqx_connector_resource:update/4, data => Updated}
|
||||
]),
|
||||
?tp(connector_post_config_update_done, #{}),
|
||||
Result;
|
||||
case ensure_no_channels(Removed) of
|
||||
ok ->
|
||||
%% The config update will be failed if any task in `perform_connector_changes` failed.
|
||||
Result = perform_connector_changes([
|
||||
#{action => fun emqx_connector_resource:remove/4, data => Removed},
|
||||
#{
|
||||
action => fun emqx_connector_resource:create/3,
|
||||
data => Added,
|
||||
on_exception_fn => fun emqx_connector_resource:remove/4
|
||||
},
|
||||
#{action => fun emqx_connector_resource:update/4, data => Updated}
|
||||
]),
|
||||
?tp(connector_post_config_update_done, #{}),
|
||||
Result;
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end;
|
||||
post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
||||
case emqx_connector_resource:get_channels(Type, Name) of
|
||||
{ok, []} ->
|
||||
|
@ -419,3 +424,30 @@ get_basic_usage_info() ->
|
|||
_:_ ->
|
||||
InitialAcc
|
||||
end.
|
||||
|
||||
ensure_no_channels(Configs) ->
|
||||
Pipeline =
|
||||
lists:map(
|
||||
fun({Type, ConnectorName}) ->
|
||||
fun(_) ->
|
||||
case emqx_connector_resource:get_channels(Type, ConnectorName) of
|
||||
{ok, []} ->
|
||||
ok;
|
||||
{ok, Channels} ->
|
||||
{error, #{
|
||||
reason => "connector_has_active_channels",
|
||||
type => Type,
|
||||
connector_name => ConnectorName,
|
||||
active_channels => Channels
|
||||
}}
|
||||
end
|
||||
end
|
||||
end,
|
||||
maps:keys(Configs)
|
||||
),
|
||||
case emqx_utils:pipeline(Pipeline, unused, unused) of
|
||||
{ok, _, _} ->
|
||||
ok;
|
||||
{error, Reason, _State} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue