emqx/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

723 lines
24 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
con_mod() ->
emqx_bridge_v2_test_connector.
con_type() ->
bridge_type().
con_name() ->
my_connector.
connector_resource_id() ->
emqx_connector_resource:resource_id(con_type(), con_name()).
bridge_type() ->
test_bridge_type.
con_schema() ->
[
{
con_type(),
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
#{
desc => <<"Test Connector Config">>,
required => false
}
)
}
].
con_config() ->
#{
<<"enable">> => true,
<<"resource_opts">> => #{
%% Set this to a low value to make the test run faster
<<"health_check_interval">> => 100
}
}.
bridge_schema() ->
bridge_schema(_Opts = #{}).
bridge_schema(Opts) ->
Type = maps:get(bridge_type, Opts, bridge_type()),
[
{
Type,
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
#{
desc => <<"Test Bridge Config">>,
required => false
}
)
}
].
bridge_config() ->
#{
<<"connector">> => atom_to_binary(con_name()),
<<"enable">> => true,
<<"send_to">> => registered_process_name(),
<<"resource_opts">> => #{
<<"resume_interval">> => 100
}
}.
fun_table_name() ->
emqx_bridge_v2_SUITE_fun_table.
registered_process_name() ->
my_registered_process.
all() ->
emqx_common_test_helpers:all(?MODULE).
start_apps() ->
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_rule_engine
].
setup_mocks() ->
MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_schema, MeckOpts),
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
catch meck:new(emqx_connector_resource, MeckOpts),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
catch meck:new(emqx_bridge_v2, MeckOpts),
BridgeType = bridge_type(),
BridgeTypeBin = atom_to_binary(BridgeType),
meck:expect(
emqx_bridge_v2,
bridge_v2_type_to_connector_type,
fun(Type) when Type =:= BridgeType; Type =:= BridgeTypeBin -> con_type() end
),
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
meck:expect(emqx_bridge_v2, is_bridge_v2_type, fun(Type) -> Type =:= BridgeType end),
ok.
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
app_specs(),
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
app_specs() ->
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_rule_engine
].
init_per_testcase(_TestCase, Config) ->
%% Setting up mocks for fake connector and bridge V2
setup_mocks(),
ets:new(fun_table_name(), [named_table, public]),
%% Create a fake connector
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
[
{mocked_mods, [
emqx_connector_schema,
emqx_connector_resource,
emqx_bridge_v2
]}
| Config
].
end_per_testcase(_TestCase, _Config) ->
ets:delete(fun_table_name()),
delete_all_bridges_and_connectors(),
meck:unload(),
emqx_common_test_helpers:call_janitor(),
ok.
delete_all_bridges_and_connectors() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
ct:pal("removing bridge ~p", [{Type, Name}]),
emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
lists:foreach(
fun(#{name := Name, type := Type}) ->
ct:pal("removing connector ~p", [{Type, Name}]),
emqx_connector:remove(Type, Name)
end,
emqx_connector:list()
),
update_root_config(#{}),
ok.
%% Hocon does not support placing a fun in a config map so we replace it with a string
wrap_fun(Fun) ->
UniqRef = make_ref(),
UniqRefBin = term_to_binary(UniqRef),
UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)),
ets:insert(fun_table_name(), {UniqRefStr, Fun}),
UniqRefStr.
unwrap_fun(UniqRefStr) ->
ets:lookup_element(fun_table_name(), UniqRefStr, 2).
update_root_config(RootConf) ->
emqx_conf:update([bridges_v2], RootConf, #{override_to => cluster}).
update_root_connectors_config(RootConf) ->
emqx_conf:update([connectors], RootConf, #{override_to => cluster}).
t_create_remove(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_list(_) ->
[] = emqx_bridge_v2:list(),
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
1 = length(emqx_bridge_v2:list()),
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge2, bridge_config()),
2 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
1 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge2),
0 = length(emqx_bridge_v2:list()),
ok.
t_create_dry_run(_) ->
ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()).
t_create_dry_run_fail_add_channel(_) ->
Msg = <<"Failed to add channel">>,
OnAddChannel1 = wrap_fun(fun() ->
{error, Msg}
end),
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
OnAddChannel2 = wrap_fun(fun() ->
throw(Msg)
end),
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_fail_get_channel_status(_) ->
Msg = <<"Failed to add channel">>,
Fun1 = wrap_fun(fun() ->
{error, Msg}
end),
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
Fun2 = wrap_fun(fun() ->
throw(Msg)
end),
Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_connector_does_not_exist(_) ->
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
t_is_valid_bridge_v1(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
%% Add another channel/bridge to the connector
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()),
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
%% Non existing bridge is a valid Bridge V1
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
ok.
t_manual_health_check(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Run a health check for the bridge
connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_exception(_) ->
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> throw(my_error) end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_exception_error(_) ->
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> error(my_error) end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_error(_) ->
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> {error, my_error} end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_send_message(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Register name for this process
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{}),
receive
<<"my_msg">> ->
ok
after 10000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge).
t_send_message_through_rule(_) ->
BridgeName = my_test_bridge,
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
%% Create a rule to send message to the bridge
{ok, _} = emqx_rule_engine:create_rule(
#{
sql => <<"select * from \"t/a\"">>,
id => atom_to_binary(?FUNCTION_NAME),
actions => [
<<
(atom_to_binary(bridge_type()))/binary,
":",
(atom_to_binary(BridgeName))/binary
>>
],
description => <<"bridge_v2 test rule">>
}
),
%% Register name for this process
register(registered_process_name(), self()),
%% Send message to the topic
ClientId = atom_to_binary(?FUNCTION_NAME),
Payload = <<"hello">>,
Msg = emqx_message:make(ClientId, 0, <<"t/a">>, Payload),
emqx:publish(Msg),
receive
#{payload := Payload} ->
ok
after 10000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)),
ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
ok.
t_send_message_through_local_topic(_) ->
%% Bridge configuration with local topic
BridgeName = my_test_bridge,
TopicName = <<"t/b">>,
BridgeConfig = (bridge_config())#{
<<"local_topic">> => TopicName
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, BridgeConfig),
%% Register name for this process
register(registered_process_name(), self()),
%% Send message to the topic
ClientId = atom_to_binary(?FUNCTION_NAME),
Payload = <<"hej">>,
Msg = emqx_message:make(ClientId, 0, TopicName, Payload),
emqx:publish(Msg),
receive
#{payload := Payload} ->
ok
after 10000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
ok.
t_send_message_unhealthy_channel(_) ->
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(OnGetStatusResponseETS, status_value, 2)
end),
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Register name for this process
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1}),
receive
Any ->
ct:pal("Received message: ~p", [Any]),
ct:fail("Should not get message here")
after 1 ->
ok
end,
%% Sending should work again after the channel is healthy
ets:insert(OnGetStatusResponseETS, {status_value, connected}),
_ = emqx_bridge_v2:send_message(
bridge_type(),
my_test_bridge,
<<"my_msg">>,
#{}
),
receive
<<"my_msg">> ->
ok
after 10000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge).
t_send_message_unhealthy_connector(_) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_start_value, conf}),
ets:insert(ResponseETS, {on_get_status_value, connecting}),
OnStartFun = wrap_fun(fun(Conf) ->
case ets:lookup_element(ResponseETS, on_start_value, 2) of
conf ->
{ok, Conf};
V ->
V
end
end),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_start_fun">> => OnStartFun,
<<"on_get_status_fun">> => OnGetStatusFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConName = ?FUNCTION_NAME,
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
BridgeConf = (bridge_config())#{
<<"connector">> => atom_to_binary(ConName)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Test that sending does not work when the connector is unhealthy (connecting)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 100}),
receive
Any ->
ct:pal("Received message: ~p", [Any]),
ct:fail("Should not get message here")
after 10 ->
ok
end,
%% We should have one alarm
1 = get_bridge_v2_alarm_cnt(),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Test that sending works again when the connector is healthy (connected)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets:insert(ResponseETS, {on_get_status_value, connected}),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1000}),
receive
<<"my_msg">> ->
ok
after 1000 ->
ct:fail("Failed to receive message")
end,
%% The alarm should be gone at this point
0 = get_bridge_v2_alarm_cnt(),
unregister(registered_process_name()),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok = emqx_connector:remove(con_type(), ConName),
ets:delete(ResponseETS),
ok.
t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> =>
wrap_fun(fun() -> {error, my_error} end)
},
0 = get_bridge_v2_alarm_cnt(),
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
1 = get_bridge_v2_alarm_cnt(),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
0 = get_bridge_v2_alarm_cnt(),
ok.
get_bridge_v2_alarm_cnt() ->
Alarms = emqx_alarm:get_alarms(activated),
FilterFun = fun
(#{name := S}) when is_binary(S) -> string:find(S, "bridge_v2") =/= nomatch;
(_) -> false
end,
length(lists:filter(FilterFun, Alarms)).
t_load_no_matching_connector(_Config) ->
Conf = bridge_config(),
BridgeTypeBin = atom_to_binary(bridge_type()),
BridgeNameBin0 = <<"my_test_bridge_update">>,
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeNameBin0, Conf)),
%% updating to invalid reference
RootConf0 = #{
BridgeTypeBin =>
#{BridgeNameBin0 => Conf#{<<"connector">> := <<"unknown">>}}
},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>,
bridge_type := _,
reason := "connector_not_found_or_wrong_type"
}}},
update_root_config(RootConf0)
),
%% creating new with invalid reference
BridgeNameBin1 = <<"my_test_bridge_new">>,
RootConf1 = #{
BridgeTypeBin =>
#{BridgeNameBin1 => Conf#{<<"connector">> := <<"unknown">>}}
},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>,
bridge_type := _,
reason := "connector_not_found_or_wrong_type"
}}},
update_root_config(RootConf1)
),
ok.
%% tests root config handler post config update hook
t_load_config_success(_Config) ->
Conf = bridge_config(),
BridgeType = bridge_type(),
BridgeTypeBin = atom_to_binary(BridgeType),
BridgeName = my_test_bridge_root,
BridgeNameBin = atom_to_binary(BridgeName),
%% pre-condition
?assertEqual(#{}, emqx_config:get([bridges_v2])),
%% create
RootConf0 = #{BridgeTypeBin => #{BridgeNameBin => Conf}},
?assertMatch(
{ok, _},
update_root_config(RootConf0)
),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
raw_config := #{},
resource_data := #{}
}},
emqx_bridge_v2:lookup(BridgeType, BridgeName)
),
%% update
RootConf1 = #{BridgeTypeBin => #{BridgeNameBin => Conf#{<<"some_key">> => <<"new_value">>}}},
?assertMatch(
{ok, _},
update_root_config(RootConf1)
),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
raw_config := #{<<"some_key">> := <<"new_value">>},
resource_data := #{}
}},
emqx_bridge_v2:lookup(BridgeType, BridgeName)
),
%% delete
RootConf2 = #{},
?assertMatch(
{ok, _},
update_root_config(RootConf2)
),
?assertMatch(
{error, not_found},
emqx_bridge_v2:lookup(BridgeType, BridgeName)
),
ok.
t_create_no_matching_connector(_Config) ->
Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := _,
connector_name := _,
bridge_type := _,
reason := "connector_not_found_or_wrong_type"
}}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
),
ok.
t_create_wrong_connector_type(_Config) ->
meck:expect(
emqx_bridge_v2_schema,
fields,
1,
bridge_schema(#{bridge_type => wrong_type})
),
Conf = bridge_config(),
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := _,
connector_name := _,
bridge_type := wrong_type,
reason := "connector_not_found_or_wrong_type"
}}},
emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
),
ok.
t_update_connector_not_found(_Config) ->
Conf = bridge_config(),
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := _,
connector_name := _,
bridge_type := _,
reason := "connector_not_found_or_wrong_type"
}}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
),
ok.
t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
%% we test the connector post config update here because we also need bridges.
Conf = bridge_config(),
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
{error, {post_config_update, _HandlerMod, {active_channels, [_ | _]}}},
emqx_connector:remove(con_type(), con_name())
),
ok.
t_remove_single_connector_being_referenced_without_active_channels(_Config) ->
%% we test the connector post config update here because we also need bridges.
Conf = bridge_config(),
BridgeName = my_test_bridge,
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
emqx_common_test_helpers:with_mock(
emqx_bridge_v2_test_connector,
on_get_channels,
fun(_ResId) -> [] end,
fun() ->
?assertMatch(ok, emqx_connector:remove(con_type(), con_name())),
%% we no longer have connector data if this happens...
?assertMatch(
{ok, #{resource_data := #{}}},
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
),
ok
end
),
ok.
t_remove_multiple_connectors_being_referenced_with_channels(_Config) ->
Conf = bridge_config(),
BridgeName = my_test_bridge,
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
reason := "connector_has_active_channels",
type := _,
connector_name := _,
active_channels := [_ | _]
}}},
update_root_connectors_config(#{})
),
ok.
t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
Conf = bridge_config(),
BridgeName = my_test_bridge,
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
emqx_common_test_helpers:with_mock(
emqx_bridge_v2_test_connector,
on_get_channels,
fun(_ResId) -> [] end,
fun() ->
?assertMatch(
{ok, _},
update_root_connectors_config(#{})
),
%% we no longer have connector data if this happens...
?assertMatch(
{ok, #{resource_data := #{}}},
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
),
ok
end
),
ok.