diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 18f79a782..28a4a71bc 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -939,7 +939,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% Check if the bridge can be converted to a valid bridge v1 %% %% * The corresponding bridge v2 should exist -%% * The connector for the bridge v2 should have exactly on channel +%% * The connector for the bridge v2 should have exactly one channel is_valid_bridge_v1(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup_conf(BridgeV2Type, BridgeName) of @@ -986,7 +986,7 @@ list_and_transform_to_bridge_v1() -> [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> - case is_valid_bridge_v1(BridgeV1Type, Name) of + case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of true -> Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup(Type, Name) of @@ -1070,7 +1070,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> %% If the bridge v2 does not exist, it is a valid bridge v1 split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); _Conf -> - case is_valid_bridge_v1(BridgeV1Type, BridgeName) of + case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 82b534642..0badc82db 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -29,6 +29,8 @@ post_request/0 ]). +-export([enterprise_api_schemas/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -70,7 +72,7 @@ post_request() -> api_schema("post"). api_schema(Method) -> - EE = enterprise_api_schemas(Method), + EE = ?MODULE:enterprise_api_schemas(Method), hoconsc:union(bridge_api_union(EE)). bridge_api_union(Refs) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl new file mode 100644 index 000000000..ffea4b1ae --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -0,0 +1,657 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v1_compatibility_layer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + emqx_cth_suite:stop(Apps), + ok. + +app_specs() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_rule_engine + ]. + +init_per_testcase(_TestCase, Config) -> + %% Setting up mocks for fake connector and bridge V2 + setup_mocks(), + ets:new(fun_table_name(), [named_table, public]), + %% Create a fake connector + {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), + [ + {mocked_mods, [ + emqx_connector_schema, + emqx_connector_resource, + + emqx_bridge_v2 + ]} + | Config + ]. + +end_per_testcase(_TestCase, _Config) -> + ets:delete(fun_table_name()), + delete_all_bridges_and_connectors(), + meck:unload(), + emqx_common_test_helpers:call_janitor(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +setup_mocks() -> + MeckOpts = [passthrough, no_link, no_history], + + catch meck:new(emqx_connector_schema, MeckOpts), + meck:expect(emqx_connector_schema, fields, 1, con_schema()), + meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]), + + catch meck:new(emqx_connector_resource, MeckOpts), + meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), + + catch meck:new(emqx_bridge_v2_schema, MeckOpts), + meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()), + + catch meck:new(emqx_bridge_v2, MeckOpts), + meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), + meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), + IsBridgeV2TypeFun = fun(Type) -> + BridgeV2Type = bridge_type(), + BridgeV2TypeBin = bridge_type_bin(), + ct:pal("is_bridge_v2_type mock: ~p", [ + #{ + input_type => Type, + expected => [BridgeV2Type, BridgeV2TypeBin] + } + ]), + case Type of + BridgeV2Type -> true; + BridgeV2TypeBin -> true; + _ -> false + end + end, + meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun), + + catch meck:new(emqx_bridge_v2_schema, MeckOpts), + meck:expect( + emqx_bridge_v2_schema, + enterprise_api_schemas, + 1, + fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end + ), + + ok. + +con_mod() -> + emqx_bridge_v2_test_connector. + +con_type() -> + bridge_type(). + +con_name() -> + my_connector. + +bridge_type() -> + test_bridge_type. + +bridge_type_bin() -> + atom_to_binary(bridge_type(), utf8). + +con_schema() -> + [ + { + con_type(), + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, "connector")), + #{ + desc => <<"Test Connector Config">>, + required => false + } + ) + } + ]. + +fields("connector") -> + [ + {enable, hoconsc:mk(any(), #{})}, + {resource_opts, hoconsc:mk(map(), #{})} + ]; +fields("api_post") -> + [ + {connector, hoconsc:mk(binary(), #{})}, + {name, hoconsc:mk(binary(), #{})}, + {type, hoconsc:mk(bridge_type(), #{})}, + {send_to, hoconsc:mk(atom(), #{})} + | fields("connector") + ]. + +con_config() -> + #{ + <<"enable">> => true, + <<"resource_opts">> => #{ + %% Set this to a low value to make the test run faster + <<"health_check_interval">> => 100 + } + }. + +bridge_schema() -> + bridge_schema(_Opts = #{}). + +bridge_schema(Opts) -> + Type = maps:get(bridge_type, Opts, bridge_type()), + [ + { + Type, + hoconsc:mk( + hoconsc:map(name, typerefl:map()), + #{ + desc => <<"Test Bridge Config">>, + required => false + } + ) + } + ]. + +bridge_config() -> + #{ + <<"connector">> => atom_to_binary(con_name()), + <<"enable">> => true, + <<"send_to">> => registered_process_name(), + <<"resource_opts">> => #{ + <<"resume_interval">> => 100 + } + }. + +fun_table_name() -> + emqx_bridge_v1_compatibility_layer_SUITE_fun_table. + +registered_process_name() -> + my_registered_process. + +delete_all_bridges_and_connectors() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing bridge ~p", [{Type, Name}]), + emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing connector ~p", [{Type, Name}]), + emqx_connector:remove(Type, Name) + end, + emqx_connector:list() + ), + update_root_config(#{}), + ok. + +%% Hocon does not support placing a fun in a config map so we replace it with a string +wrap_fun(Fun) -> + UniqRef = make_ref(), + UniqRefBin = term_to_binary(UniqRef), + UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)), + ets:insert(fun_table_name(), {UniqRefStr, Fun}), + UniqRefStr. + +unwrap_fun(UniqRefStr) -> + ets:lookup_element(fun_table_name(), UniqRefStr, 2). + +update_root_config(RootConf) -> + emqx_conf:update([bridges_v2], RootConf, #{override_to => cluster}). + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + ok = emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ), + %% at some point during the tests, sometimes `emqx_bridge:list()' + %% returns an empty list, but `emqx:get_config([bridges])' returns + %% a bunch of orphan test bridges... + lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()), + emqx_config:put([bridges], #{}), + ok. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +list_bridges_http_api_v1() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("list bridges (http v1)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v1) result:\n ~p", [Res]), + Res. + +list_bridges_http_api_v2() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]), + ct:pal("list bridges (http v2)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v2) result:\n ~p", [Res]), + Res. + +list_connectors_http() -> + Path = emqx_mgmt_api_test_util:api_path(["connectors"]), + ct:pal("list connectors"), + Res = request(get, Path, _Params = []), + ct:pal("list connectors result:\n ~p", [Res]), + Res. + +get_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + ct:pal("get bridge (http v1) (~p)", [#{name => Name}]), + Res = request(get, Path, _Params = []), + ct:pal("get bridge (http v1) (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +get_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]), + ct:pal("get bridge (http v2) (~p)", [#{name => Name}]), + Res = request(get, Path, _Params = []), + ct:pal("get bridge (http v2) (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +get_connector_http(Name) -> + ConnectorId = emqx_connector_resource:connector_id(con_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]), + ct:pal("get connector (~p)", [#{name => Name, id => ConnectorId}]), + Res = request(get, Path, _Params = []), + ct:pal("get connector (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +create_bridge_http_api_v1(Opts) -> + Name = maps:get(name, Opts), + Overrides = maps:get(overrides, Opts, #{}), + BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("creating bridge (http v1): ~p", [Params]), + Res = request(post, Path, Params), + ct:pal("bridge create (http v1) result:\n ~p", [Res]), + Res. + +create_bridge_http_api_v2(Opts) -> + Name = maps:get(name, Opts), + Overrides = maps:get(overrides, Opts, #{}), + BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]), + ct:pal("creating bridge (http v2): ~p", [Params]), + Res = request(post, Path, Params), + ct:pal("bridge create (http v2) result:\n ~p", [Res]), + Res. + +delete_bridge_http_api_v1(Opts) -> + Name = maps:get(name, Opts), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + ct:pal("deleting bridge (http v1)"), + Res = request(delete, Path, _Params = []), + ct:pal("bridge delete (http v1) result:\n ~p", [Res]), + Res. + +delete_bridge_http_api_v2(Opts) -> + Name = maps:get(name, Opts), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]), + ct:pal("deleting bridge (http v2)"), + Res = request(delete, Path, _Params = []), + ct:pal("bridge delete (http v2) result:\n ~p", [Res]), + Res. + +enable_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "true"]), + ct:pal("enabling bridge (http v1)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge enable (http v1) result:\n ~p", [Res]), + Res. + +enable_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "true"]), + ct:pal("enabling bridge (http v2)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge enable (http v2) result:\n ~p", [Res]), + Res. + +disable_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "false"]), + ct:pal("disabling bridge (http v1)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge disable (http v1) result:\n ~p", [Res]), + Res. + +disable_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "false"]), + ct:pal("disabling bridge (http v2)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge disable (http v2) result:\n ~p", [Res]), + Res. + +bridge_operation_http_api_v1(Name, Op0) -> + Op = atom_to_list(Op0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + ct:pal("bridge op ~p (http v1)", [Op]), + Res = request(post, Path, _Params = []), + ct:pal("bridge op ~p (http v1) result:\n ~p", [Op, Res]), + Res. + +bridge_operation_http_api_v2(Name, Op0) -> + Op = atom_to_list(Op0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, Op]), + ct:pal("bridge op ~p (http v2)", [Op]), + Res = request(post, Path, _Params = []), + ct:pal("bridge op ~p (http v2) result:\n ~p", [Op, Res]), + Res. + +bridge_node_operation_http_api_v1(Name, Node0, Op0) -> + Op = atom_to_list(Op0), + Node = atom_to_list(Node0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges", BridgeId, Op]), + ct:pal("bridge node op ~p (http v1)", [{Node, Op}]), + Res = request(post, Path, _Params = []), + ct:pal("bridge node op ~p (http v1) result:\n ~p", [{Node, Op}, Res]), + Res. + +bridge_node_operation_http_api_v2(Name, Node0, Op0) -> + Op = atom_to_list(Op0), + Node = atom_to_list(Node0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges_v2", BridgeId, Op]), + ct:pal("bridge node op ~p (http v2)", [{Node, Op}]), + Res = request(post, Path, _Params = []), + ct:pal("bridge node op ~p (http v2) result:\n ~p", [{Node, Op}, Res]), + Res. + +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +t_scenario_1(_Config) -> + %% =================================================================================== + %% Pre-conditions + %% =================================================================================== + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()), + %% created in the test case init + ?assertMatch({ok, {{_, 200, _}, _, [#{}]}}, list_connectors_http()), + {ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}} = list_connectors_http(), + + %% =================================================================================== + %% Create a single bridge v2. It should still be listed and functional when using v1 + %% APIs. + %% =================================================================================== + NameA = <<"bridgev2a">>, + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + create_bridge_http_api_v1(#{name => NameA}) + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()), + %% created a new one from the v1 API + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start) + ), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)), + + {ok, {{_, 200, _}, _, #{<<"connector">> := GeneratedConnName}}} = get_bridge_http_api_v2(NameA), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + + %% =================================================================================== + %% TODO: Update the bridge using v1 API. + %% =================================================================================== + + %% =================================================================================== + %% Now create a new bridge_v2 pointing to the same connector. It should no longer be + %% functions via v1 API, nor be listed in it. The new bridge must create a new + %% channel, so that this bridge is no longer considered v1. + %% =================================================================================== + NameB = <<"bridgev2b">>, + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + create_bridge_http_api_v2(#{ + name => NameB, overrides => #{<<"connector">> => GeneratedConnName} + }) + ), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch( + {ok, {{_, 200, _}, _, [#{<<"name">> := _}, #{<<"name">> := _}]}}, list_bridges_http_api_v2() + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameB)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameB}}}, get_bridge_http_api_v2(NameB)), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + + ?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameB)), + ?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameB)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, stop)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, start)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, restart)), + + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), stop) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), start) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), restart) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)), + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start) + ), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, node(), start) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)), + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, restart)), + + %% =================================================================================== + %% Delete the 2nd new bridge so it appears again in the V1 API. + %% =================================================================================== + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, + delete_bridge_http_api_v2(#{name => NameB}) + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()), + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + %% =================================================================================== + %% Delete the last bridge using API v1. The generated connector should also be + %% removed. + %% =================================================================================== + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, + delete_bridge_http_api_v1(#{name => NameA}) + ), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()), + %% only the pre-existing one should remain. + ?assertMatch( + {ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}}, + list_connectors_http() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := PreexistentConnectorName}}}, + get_connector_http(PreexistentConnectorName) + ), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_connector_http(GeneratedConnName)), + ?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + ok. diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d006d27c0..f8447fe8d 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -27,6 +27,8 @@ -export([get_response/0, put_request/0, post_request/0]). +-export([connector_type_to_bridge_types/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -143,11 +145,11 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> end. transform_old_style_bridges_to_connector_and_actions_of_type( - {ConnectorType, #{type := {map, name, {ref, ConnectorConfSchemaMod, ConnectorConfSchemaName}}}}, + {ConnectorType, #{type := ?MAP(_Name, ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName))}}, RawConfig ) -> ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName), - BridgeTypes = connector_type_to_bridge_types(ConnectorType), + BridgeTypes = ?MODULE:connector_type_to_bridge_types(ConnectorType), BridgesConfMap = maps:get(<<"bridges">>, RawConfig, #{}), ConnectorsConfMap = maps:get(<<"connectors">>, RawConfig, #{}), BridgeConfigsToTransform1 = @@ -200,7 +202,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type( ). transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) -> - ConnectorFields = fields(connectors), + ConnectorFields = ?MODULE:fields(connectors), NewRawConf = lists:foldl( fun transform_old_style_bridges_to_connector_and_actions_of_type/2, RawConfig,