diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 21000d505..5c340c3e8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -71,7 +71,8 @@ start/2, stop/2, restart/2, - reset_metrics/2 + reset_metrics/2, + create_dry_run/2 ]). %% Config Update Handler API @@ -93,7 +94,7 @@ get_channels_for_connector(ConnectorId) -> RelevantBridgeV2Types = [ Type || Type <- RootConf, - bridge_v2_type_to_connector_type(Type) =:= ConnectorType + ?MODULE:bridge_v2_type_to_connector_type(Type) =:= ConnectorType ], lists:flatten([ get_channels_for_connector(ConnectorName, BridgeV2Type) @@ -147,7 +148,7 @@ install_bridge_v2( end, %% If there is a running connector, we need to install the Bridge V2 in it ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config), ok. @@ -170,13 +171,14 @@ uninstall_bridge_v2( ok = emqx_resource:clear_metrics(BridgeV2Id), %% Deinstall from connector ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). get_query_mode(BridgeV2Type, Config) -> CreationOpts = emqx_resource:fetch_creation_opts(Config), - ResourceType = emqx_bridge_resource:bridge_to_resource_type(BridgeV2Type), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), + ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). send_message(BridgeType, BridgeName, Message, QueryOpts0) -> @@ -209,7 +211,7 @@ health_check(BridgeType, BridgeName) -> connector := ConnectorName } -> ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(BridgeType), ConnectorName + ?MODULE:bridge_v2_type_to_connector_type(BridgeType), ConnectorName ), emqx_resource_manager:channel_health_check( ConnectorId, id(BridgeType, BridgeName, ConnectorName) @@ -246,7 +248,7 @@ stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id), ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). @@ -260,7 +262,7 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) - BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), %% Deinstall from connector ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config). @@ -292,7 +294,7 @@ id(BridgeType, BridgeName) -> end. id(BridgeType, BridgeName, ConnectorName) -> - ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeType)), + ConnectorType = bin(?MODULE:bridge_v2_type_to_connector_type(BridgeType)), <<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. @@ -304,7 +306,7 @@ external_id(BridgeType, BridgeName) -> <>. bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) -> - bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin)); + ?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin)); bridge_v2_type_to_connector_type(kafka) -> kafka. @@ -381,7 +383,7 @@ lookup(Type, Name) -> {error, not_found}; #{<<"connector">> := BridgeConnector} = RawConf -> ConnectorId = emqx_connector_resource:resource_id( - bridge_v2_type_to_connector_type(Type), BridgeConnector + ?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector ), InstanceData = case emqx_resource:get_instance(ConnectorId) of @@ -402,7 +404,7 @@ lookup(Type, Name) -> lookup_and_transform_to_bridge_v1(Type, Name) -> case lookup(Type, Name) of {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} -> - ConnectorType = bridge_v2_type_to_connector_type(Type), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, Connector} -> lookup_and_transform_to_bridge_v1_helper( @@ -492,7 +494,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> %% Create fake global config for the transformation and then call %% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1 - ConnectorType = bridge_v2_type_to_connector_type(BridgeType), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), %% Needed so name confligts will ba avoided CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}), FakeGlobalConfig = #{ @@ -585,13 +587,18 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> RawConf = maps:without([<<"name">>], RawConfig0), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), #{ - connector_type := ConnectorType, + connector_type := _ConnectorType, connector_name := _NewConnectorName, connector_conf := ConnectorRawConf, bridge_v2_type := BridgeType, - bridge_v2_name := BridgeName, + bridge_v2_name := _BridgeName, bridge_v2_conf := BridgeV2RawConf } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), + create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf). + +create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> + BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), OnReadyCallback = fun(ConnectorId) -> {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), @@ -614,6 +621,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> end, emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). +create_dry_run(Type, Conf0) -> + Conf1 = maps:without([<<"name">>], Conf0), + TypeBin = bin(Type), + TypeAtom = binary_to_existing_atom(TypeBin), + RawConf = #{<<"bridges_v2">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + %% Check config + try + #{bridges_v2 := #{TypeAtom := #{temp_name := _Conf}}} = + hocon_tconf:check_plain( + emqx_bridge_v2_schema, + RawConf, + #{atom_key => true, required => false} + ) + catch + %% validation errors + throw:Reason1 -> + {error, Reason1} + end, + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end. + %% NOTE: This function can cause broken references but it is only called from %% test cases. remove(BridgeType, BridgeName) -> @@ -667,7 +702,7 @@ bridge_v1_check_deps_and_remove( %% Check if there are other channels that depends on the same connector case connector_has_channels(BridgeType, ConnectorName) of false -> - ConnectorType = bridge_v2_type_to_connector_type(BridgeType), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), emqx_connector:remove(ConnectorType, ConnectorName); true -> ok @@ -678,7 +713,7 @@ bridge_v1_check_deps_and_remove(_BridgeType, _BridgeName, _RemoveDeps, Error) -> Error. connector_has_channels(BridgeV2Type, ConnectorName) -> - ConnectorType = bridge_v2_type_to_connector_type(BridgeV2Type), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), case emqx_connector_resource:get_channels(ConnectorType, ConnectorName) of {ok, []} -> false; @@ -842,7 +877,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) -> Error -> throw(Error) end, - ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeV2Type)), + ConnectorType = bin(?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type)), <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>> end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl new file mode 100644 index 000000000..73e03e7c9 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -0,0 +1,156 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +con_mod() -> + emqx_bridge_v2_test_connector. + +con_type() -> + test_connector_type. + +con_name() -> + my_connector. + +bridge_type() -> + test_bridge_type. + +con_schema() -> + [ + { + con_type(), + hoconsc:mk( + hoconsc:map(name, typerefl:map()), + #{ + desc => <<"Test Connector Config">>, + required => false + } + ) + } + ]. + +con_config() -> + #{}. + +bridge_schema() -> + [ + { + bridge_type(), + hoconsc:mk( + hoconsc:map(name, typerefl:map()), + #{ + desc => <<"Test Bridge Config">>, + required => false + } + ) + } + ]. + +bridge_config() -> + #{ + <<"connector">> => con_name() + }. + +all() -> + emqx_common_test_helpers:all(?MODULE). + +start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge]. + +init_per_suite(Config) -> + %% Setting up mocks for fake connector and bridge V2 + meck:new(emqx_connector_schema, [passthrough, no_link]), + meck:expect(emqx_connector_schema, fields, 1, con_schema()), + + meck:new(emqx_connector_resource, [passthrough, no_link]), + meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), + + meck:new(emqx_bridge_v2_schema, [passthrough, no_link]), + meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()), + + meck:new(emqx_bridge_v2, [passthrough, no_link]), + meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), + + _ = application:load(emqx_conf), + ok = emqx_common_test_helpers:start_apps(start_apps()), + [ + {mocked_mods, [ + emqx_connector_schema, + emqx_connector_resource, + emqx_bridge_v2_schema, + emqx_bridge_v2 + ]} + | Config + ]. + +end_per_suite(Config) -> + MockedMods = proplists:get_value(mocked_mods, Config), + meck:unload(MockedMods), + emqx_common_test_helpers:stop_apps(start_apps()). + +init_per_testcase(_TestCase, Config) -> + %% Create a fake connector + {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), + Config. + +end_per_testcase(_TestCase, Config) -> + %% Remove the fake connector + ok = emqx_connector:remove(con_type(), con_name()), + Config. + +t_create_remove(_) -> + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_create_dry_run(_) -> + ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()). + +t_create_dry_run_fail_add_channel(_) -> + Msg = <<"Failed to add channel">>, + OnAddChannel1 = fun() -> + {error, Msg} + end, + Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, + {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + OnAddChannel2 = fun() -> + throw(Msg) + end, + Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, + {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), + ok. + +t_create_dry_run_fail_get_channel_status(_) -> + Msg = <<"Failed to add channel">>, + Fun1 = fun() -> + {error, Msg} + end, + Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, + {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + Fun2 = fun() -> + throw(Msg) + end, + Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2}, + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), + ok. + +t_create_dry_run_connector_does_not_exist(_) -> + BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>}, + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl new file mode 100644 index 000000000..cda923bde --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -0,0 +1,110 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_test_connector). + +-behaviour(emqx_resource). + +-export([ + query_mode/1, + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_query_async/4, + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +query_mode(_Config) -> + sync. + +callback_mode() -> + always_sync. + +on_start(_InstId, _Config) -> + {ok, #{}}. + +on_add_channel( + _InstId, + _State, + _ChannelId, + #{on_add_channel_fun := Fun} +) -> + Fun(); +on_add_channel( + _InstId, + State, + ChannelId, + ChannelConfig +) -> + Channels = maps:get(channels, State, #{}), + NewChannels = maps:put(ChannelId, ChannelConfig, Channels), + NewState = maps:put(channels, NewChannels, State), + {ok, NewState}. + +on_stop(_InstanceId, _State) -> + ok. + +on_remove_channel( + _InstId, + State, + ChannelId +) -> + Channels = maps:get(channels, State, #{}), + NewChannels = maps:remove(ChannelId, Channels), + NewState = maps:put(channels, NewChannels, State), + {ok, NewState}. + +on_query( + _InstId, + {_MessageTag, _Message}, + _ConnectorState +) -> + throw(not_implemented). + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +on_query_async( + _InstId, + {_MessageTag, _Message}, + _AsyncReplyFn, + _ConnectorState +) -> + throw(not_implemented). + +on_get_status( + _InstId, + _State +) -> + connected. + +on_get_channel_status( + _ResId, + ChannelId, + State +) -> + Channels = maps:get(channels, State), + ChannelState = maps:get(ChannelId, Channels), + case ChannelState of + #{on_get_channel_status_fun := Fun} -> + Fun(); + _ -> + connected + end. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 2c3bdfa74..2fd440e59 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -56,16 +56,25 @@ when -optional_callbacks([connector_config/2]). -if(?EMQX_RELEASE_EDITION == ee). -connector_to_resource_type(ConnectorType) -> emqx_connector_ee_schema:resource_type(ConnectorType). +connector_to_resource_type(ConnectorType) -> + try + emqx_connector_ee_schema:resource_type(ConnectorType) + catch + _:_ -> connector_to_resource_type_ce(ConnectorType) + end. connector_impl_module(ConnectorType) -> emqx_connector_ee_schema:connector_impl_module(ConnectorType). -else. -connector_to_resource_type(_) -> undefined. + +connector_to_resource_type(ConnectorType) -> connector_to_resource_type_ce(ConnectorType). connector_impl_module(_ConnectorType) -> undefined. + -endif. +connector_to_resource_type_ce(_) -> undefined. + resource_id(ConnectorId) when is_binary(ConnectorId) -> <<"connector:", ConnectorId/binary>>. @@ -166,7 +175,7 @@ create(Type, Name, Conf, Opts) -> {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), <<"emqx_connector">>, - connector_to_resource_type(Type), + ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) ), @@ -236,7 +245,7 @@ recreate(Type, Name, Conf, Opts) -> TypeBin = bin(Type), emqx_resource:recreate_local( resource_id(Type, Name), - connector_to_resource_type(Type), + ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) ). @@ -266,7 +275,7 @@ create_dry_run(Type, Conf0, Callback) -> {ok, ConfNew} -> ParseConf = parse_confs(bin(Type), TmpName, ConfNew), emqx_resource:create_dry_run_local( - TmpName, connector_to_resource_type(Type), ParseConf, Callback + TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback ) end catch