feat(bridge_v2): dry_run and specific test suite

This commit is contained in:
Kjell Winblad 2023-10-16 16:36:10 +02:00 committed by Zaiming (Stone) Shi
parent 04943ccbf0
commit e13196c1ca
4 changed files with 333 additions and 23 deletions

View File

@ -71,7 +71,8 @@
start/2,
stop/2,
restart/2,
reset_metrics/2
reset_metrics/2,
create_dry_run/2
]).
%% Config Update Handler API
@ -93,7 +94,7 @@ get_channels_for_connector(ConnectorId) ->
RelevantBridgeV2Types = [
Type
|| Type <- RootConf,
bridge_v2_type_to_connector_type(Type) =:= ConnectorType
?MODULE:bridge_v2_type_to_connector_type(Type) =:= ConnectorType
],
lists:flatten([
get_channels_for_connector(ConnectorName, BridgeV2Type)
@ -147,7 +148,7 @@ install_bridge_v2(
end,
%% If there is a running connector, we need to install the Bridge V2 in it
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config),
ok.
@ -170,13 +171,14 @@ uninstall_bridge_v2(
ok = emqx_resource:clear_metrics(BridgeV2Id),
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
get_query_mode(BridgeV2Type, Config) ->
CreationOpts = emqx_resource:fetch_creation_opts(Config),
ResourceType = emqx_bridge_resource:bridge_to_resource_type(BridgeV2Type),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
@ -209,7 +211,7 @@ health_check(BridgeType, BridgeName) ->
connector := ConnectorName
} ->
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeType), ConnectorName
?MODULE:bridge_v2_type_to_connector_type(BridgeType), ConnectorName
),
emqx_resource_manager:channel_health_check(
ConnectorId, id(BridgeType, BridgeName, ConnectorName)
@ -246,7 +248,7 @@ stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id),
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
@ -260,7 +262,7 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) -
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config).
@ -292,7 +294,7 @@ id(BridgeType, BridgeName) ->
end.
id(BridgeType, BridgeName, ConnectorName) ->
ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeType)),
ConnectorType = bin(?MODULE:bridge_v2_type_to_connector_type(BridgeType)),
<<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:",
(bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>.
@ -304,7 +306,7 @@ external_id(BridgeType, BridgeName) ->
<<Type/binary, ":", Name/binary>>.
bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) ->
bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin));
?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin));
bridge_v2_type_to_connector_type(kafka) ->
kafka.
@ -381,7 +383,7 @@ lookup(Type, Name) ->
{error, not_found};
#{<<"connector">> := BridgeConnector} = RawConf ->
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(Type), BridgeConnector
?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector
),
InstanceData =
case emqx_resource:get_instance(ConnectorId) of
@ -402,7 +404,7 @@ lookup(Type, Name) ->
lookup_and_transform_to_bridge_v1(Type, Name) ->
case lookup(Type, Name) of
{ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} ->
ConnectorType = bridge_v2_type_to_connector_type(Type),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
case emqx_connector:lookup(ConnectorType, ConnectorName) of
{ok, Connector} ->
lookup_and_transform_to_bridge_v1_helper(
@ -492,7 +494,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
%% Create fake global config for the transformation and then call
%% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1
ConnectorType = bridge_v2_type_to_connector_type(BridgeType),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType),
%% Needed so name confligts will ba avoided
CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
FakeGlobalConfig = #{
@ -585,13 +587,18 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
#{
connector_type := ConnectorType,
connector_type := _ConnectorType,
connector_name := _NewConnectorName,
connector_conf := ConnectorRawConf,
bridge_v2_type := BridgeType,
bridge_v2_name := BridgeName,
bridge_v2_name := _BridgeName,
bridge_v2_conf := BridgeV2RawConf
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf).
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType),
OnReadyCallback =
fun(ConnectorId) ->
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
@ -614,6 +621,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
end,
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
TypeBin = bin(Type),
TypeAtom = binary_to_existing_atom(TypeBin),
RawConf = #{<<"bridges_v2">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
%% Check config
try
#{bridges_v2 := #{TypeAtom := #{temp_name := _Conf}}} =
hocon_tconf:check_plain(
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
)
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end,
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end.
%% NOTE: This function can cause broken references but it is only called from
%% test cases.
remove(BridgeType, BridgeName) ->
@ -667,7 +702,7 @@ bridge_v1_check_deps_and_remove(
%% Check if there are other channels that depends on the same connector
case connector_has_channels(BridgeType, ConnectorName) of
false ->
ConnectorType = bridge_v2_type_to_connector_type(BridgeType),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType),
emqx_connector:remove(ConnectorType, ConnectorName);
true ->
ok
@ -678,7 +713,7 @@ bridge_v1_check_deps_and_remove(_BridgeType, _BridgeName, _RemoveDeps, Error) ->
Error.
connector_has_channels(BridgeV2Type, ConnectorName) ->
ConnectorType = bridge_v2_type_to_connector_type(BridgeV2Type),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
case emqx_connector_resource:get_channels(ConnectorType, ConnectorName) of
{ok, []} ->
false;
@ -842,7 +877,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) ->
Error ->
throw(Error)
end,
ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeV2Type)),
ConnectorType = bin(?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type)),
<<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>
end.

View File

@ -0,0 +1,156 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
con_mod() ->
emqx_bridge_v2_test_connector.
con_type() ->
test_connector_type.
con_name() ->
my_connector.
bridge_type() ->
test_bridge_type.
con_schema() ->
[
{
con_type(),
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
#{
desc => <<"Test Connector Config">>,
required => false
}
)
}
].
con_config() ->
#{}.
bridge_schema() ->
[
{
bridge_type(),
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
#{
desc => <<"Test Bridge Config">>,
required => false
}
)
}
].
bridge_config() ->
#{
<<"connector">> => con_name()
}.
all() ->
emqx_common_test_helpers:all(?MODULE).
start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge].
init_per_suite(Config) ->
%% Setting up mocks for fake connector and bridge V2
meck:new(emqx_connector_schema, [passthrough, no_link]),
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
meck:new(emqx_bridge_v2_schema, [passthrough, no_link]),
meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
meck:new(emqx_bridge_v2, [passthrough, no_link]),
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
_ = application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps(start_apps()),
[
{mocked_mods, [
emqx_connector_schema,
emqx_connector_resource,
emqx_bridge_v2_schema,
emqx_bridge_v2
]}
| Config
].
end_per_suite(Config) ->
MockedMods = proplists:get_value(mocked_mods, Config),
meck:unload(MockedMods),
emqx_common_test_helpers:stop_apps(start_apps()).
init_per_testcase(_TestCase, Config) ->
%% Create a fake connector
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
Config.
end_per_testcase(_TestCase, Config) ->
%% Remove the fake connector
ok = emqx_connector:remove(con_type(), con_name()),
Config.
t_create_remove(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_create_dry_run(_) ->
ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()).
t_create_dry_run_fail_add_channel(_) ->
Msg = <<"Failed to add channel">>,
OnAddChannel1 = fun() ->
{error, Msg}
end,
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
OnAddChannel2 = fun() ->
throw(Msg)
end,
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_fail_get_channel_status(_) ->
Msg = <<"Failed to add channel">>,
Fun1 = fun() ->
{error, Msg}
end,
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
Fun2 = fun() ->
throw(Msg)
end,
Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_connector_does_not_exist(_) ->
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).

View File

@ -0,0 +1,110 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_test_connector).
-behaviour(emqx_resource).
-export([
query_mode/1,
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
query_mode(_Config) ->
sync.
callback_mode() ->
always_sync.
on_start(_InstId, _Config) ->
{ok, #{}}.
on_add_channel(
_InstId,
_State,
_ChannelId,
#{on_add_channel_fun := Fun}
) ->
Fun();
on_add_channel(
_InstId,
State,
ChannelId,
ChannelConfig
) ->
Channels = maps:get(channels, State, #{}),
NewChannels = maps:put(ChannelId, ChannelConfig, Channels),
NewState = maps:put(channels, NewChannels, State),
{ok, NewState}.
on_stop(_InstanceId, _State) ->
ok.
on_remove_channel(
_InstId,
State,
ChannelId
) ->
Channels = maps:get(channels, State, #{}),
NewChannels = maps:remove(ChannelId, Channels),
NewState = maps:put(channels, NewChannels, State),
{ok, NewState}.
on_query(
_InstId,
{_MessageTag, _Message},
_ConnectorState
) ->
throw(not_implemented).
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_query_async(
_InstId,
{_MessageTag, _Message},
_AsyncReplyFn,
_ConnectorState
) ->
throw(not_implemented).
on_get_status(
_InstId,
_State
) ->
connected.
on_get_channel_status(
_ResId,
ChannelId,
State
) ->
Channels = maps:get(channels, State),
ChannelState = maps:get(ChannelId, Channels),
case ChannelState of
#{on_get_channel_status_fun := Fun} ->
Fun();
_ ->
connected
end.

View File

@ -56,16 +56,25 @@ when
-optional_callbacks([connector_config/2]).
-if(?EMQX_RELEASE_EDITION == ee).
connector_to_resource_type(ConnectorType) -> emqx_connector_ee_schema:resource_type(ConnectorType).
connector_to_resource_type(ConnectorType) ->
try
emqx_connector_ee_schema:resource_type(ConnectorType)
catch
_:_ -> connector_to_resource_type_ce(ConnectorType)
end.
connector_impl_module(ConnectorType) ->
emqx_connector_ee_schema:connector_impl_module(ConnectorType).
-else.
connector_to_resource_type(_) -> undefined.
connector_to_resource_type(ConnectorType) -> connector_to_resource_type_ce(ConnectorType).
connector_impl_module(_ConnectorType) -> undefined.
-endif.
connector_to_resource_type_ce(_) -> undefined.
resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>.
@ -166,7 +175,7 @@ create(Type, Name, Conf, Opts) ->
{ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name),
<<"emqx_connector">>,
connector_to_resource_type(Type),
?MODULE:connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, Opts)
),
@ -236,7 +245,7 @@ recreate(Type, Name, Conf, Opts) ->
TypeBin = bin(Type),
emqx_resource:recreate_local(
resource_id(Type, Name),
connector_to_resource_type(Type),
?MODULE:connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, Opts)
).
@ -266,7 +275,7 @@ create_dry_run(Type, Conf0, Callback) ->
{ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
emqx_resource:create_dry_run_local(
TmpName, connector_to_resource_type(Type), ParseConf, Callback
TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback
)
end
catch