From c90ca1ea5322b72de680d65d7d541ce198515c38 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 31 Oct 2023 10:40:06 -0300 Subject: [PATCH 1/5] fix(bridge_v1): always delete connector when deleting v1 bridge Fixes https://emqx.atlassian.net/browse/EMQX-11287 --- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 945ff250c..4175da910 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -507,7 +507,7 @@ schema("/bridges_probe") -> case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of <<"true">> -> [rule_actions, connector]; true -> [rule_actions, connector]; - _ -> [] + _ -> [connector] end, case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDelete) of ok -> From 84e78f5d2e5101526952f1f99e5b68d6e328361e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 31 Oct 2023 10:40:56 -0300 Subject: [PATCH 2/5] fix(bridge_api): fix response status code for some operations --- apps/emqx_bridge/src/emqx_bridge_api.erl | 5 +++++ apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 4175da910..eb0efcb14 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -387,6 +387,7 @@ schema("/bridges/:id/enable/:enable") -> responses => #{ 204 => <<"Success">>, + 400 => error_schema('BAD_REQUEST', non_compat_bridge_msg()), 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } @@ -667,6 +668,10 @@ get_metrics_from_local_node(BridgeType0, BridgeName) -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); {error, timeout} -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, not_bridge_v1_compatible} -> + ?BAD_REQUEST(non_compat_bridge_msg()); + {error, bridge_not_found} -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); {error, Reason} -> ?INTERNAL_ERROR(Reason) end diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 5adfa8f0c..da6c697a1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -394,7 +394,7 @@ schema("/bridges_v2_probe") -> case emqx_bridge_v2:disable_enable(enable_func(Enable), BridgeType, BridgeName) of {ok, _} -> ?NO_CONTENT; - {error, {pre_config_update, _, not_found}} -> + {error, {pre_config_update, _, bridge_not_found}} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); {error, {_, _, timeout}} -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); From 8eb822d898ddc66b5397a247b6f7acdf033fb181 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 31 Oct 2023 10:46:00 -0300 Subject: [PATCH 3/5] test(bridges): add bridge v1 compatibility layer test suite --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 6 +- .../src/schema/emqx_bridge_v2_schema.erl | 4 +- ...qx_bridge_v1_compatibility_layer_SUITE.erl | 657 ++++++++++++++++++ .../src/schema/emqx_connector_schema.erl | 8 +- 4 files changed, 668 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 18f79a782..28a4a71bc 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -939,7 +939,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% Check if the bridge can be converted to a valid bridge v1 %% %% * The corresponding bridge v2 should exist -%% * The connector for the bridge v2 should have exactly on channel +%% * The connector for the bridge v2 should have exactly one channel is_valid_bridge_v1(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup_conf(BridgeV2Type, BridgeName) of @@ -986,7 +986,7 @@ list_and_transform_to_bridge_v1() -> [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> - case is_valid_bridge_v1(BridgeV1Type, Name) of + case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of true -> Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup(Type, Name) of @@ -1070,7 +1070,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> %% If the bridge v2 does not exist, it is a valid bridge v1 split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); _Conf -> - case is_valid_bridge_v1(BridgeV1Type, BridgeName) of + case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 82b534642..0badc82db 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -29,6 +29,8 @@ post_request/0 ]). +-export([enterprise_api_schemas/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -70,7 +72,7 @@ post_request() -> api_schema("post"). api_schema(Method) -> - EE = enterprise_api_schemas(Method), + EE = ?MODULE:enterprise_api_schemas(Method), hoconsc:union(bridge_api_union(EE)). bridge_api_union(Refs) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl new file mode 100644 index 000000000..ffea4b1ae --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -0,0 +1,657 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v1_compatibility_layer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + emqx_cth_suite:stop(Apps), + ok. + +app_specs() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_rule_engine + ]. + +init_per_testcase(_TestCase, Config) -> + %% Setting up mocks for fake connector and bridge V2 + setup_mocks(), + ets:new(fun_table_name(), [named_table, public]), + %% Create a fake connector + {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), + [ + {mocked_mods, [ + emqx_connector_schema, + emqx_connector_resource, + + emqx_bridge_v2 + ]} + | Config + ]. + +end_per_testcase(_TestCase, _Config) -> + ets:delete(fun_table_name()), + delete_all_bridges_and_connectors(), + meck:unload(), + emqx_common_test_helpers:call_janitor(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +setup_mocks() -> + MeckOpts = [passthrough, no_link, no_history], + + catch meck:new(emqx_connector_schema, MeckOpts), + meck:expect(emqx_connector_schema, fields, 1, con_schema()), + meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]), + + catch meck:new(emqx_connector_resource, MeckOpts), + meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), + + catch meck:new(emqx_bridge_v2_schema, MeckOpts), + meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()), + + catch meck:new(emqx_bridge_v2, MeckOpts), + meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), + meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), + IsBridgeV2TypeFun = fun(Type) -> + BridgeV2Type = bridge_type(), + BridgeV2TypeBin = bridge_type_bin(), + ct:pal("is_bridge_v2_type mock: ~p", [ + #{ + input_type => Type, + expected => [BridgeV2Type, BridgeV2TypeBin] + } + ]), + case Type of + BridgeV2Type -> true; + BridgeV2TypeBin -> true; + _ -> false + end + end, + meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun), + + catch meck:new(emqx_bridge_v2_schema, MeckOpts), + meck:expect( + emqx_bridge_v2_schema, + enterprise_api_schemas, + 1, + fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end + ), + + ok. + +con_mod() -> + emqx_bridge_v2_test_connector. + +con_type() -> + bridge_type(). + +con_name() -> + my_connector. + +bridge_type() -> + test_bridge_type. + +bridge_type_bin() -> + atom_to_binary(bridge_type(), utf8). + +con_schema() -> + [ + { + con_type(), + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, "connector")), + #{ + desc => <<"Test Connector Config">>, + required => false + } + ) + } + ]. + +fields("connector") -> + [ + {enable, hoconsc:mk(any(), #{})}, + {resource_opts, hoconsc:mk(map(), #{})} + ]; +fields("api_post") -> + [ + {connector, hoconsc:mk(binary(), #{})}, + {name, hoconsc:mk(binary(), #{})}, + {type, hoconsc:mk(bridge_type(), #{})}, + {send_to, hoconsc:mk(atom(), #{})} + | fields("connector") + ]. + +con_config() -> + #{ + <<"enable">> => true, + <<"resource_opts">> => #{ + %% Set this to a low value to make the test run faster + <<"health_check_interval">> => 100 + } + }. + +bridge_schema() -> + bridge_schema(_Opts = #{}). + +bridge_schema(Opts) -> + Type = maps:get(bridge_type, Opts, bridge_type()), + [ + { + Type, + hoconsc:mk( + hoconsc:map(name, typerefl:map()), + #{ + desc => <<"Test Bridge Config">>, + required => false + } + ) + } + ]. + +bridge_config() -> + #{ + <<"connector">> => atom_to_binary(con_name()), + <<"enable">> => true, + <<"send_to">> => registered_process_name(), + <<"resource_opts">> => #{ + <<"resume_interval">> => 100 + } + }. + +fun_table_name() -> + emqx_bridge_v1_compatibility_layer_SUITE_fun_table. + +registered_process_name() -> + my_registered_process. + +delete_all_bridges_and_connectors() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing bridge ~p", [{Type, Name}]), + emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing connector ~p", [{Type, Name}]), + emqx_connector:remove(Type, Name) + end, + emqx_connector:list() + ), + update_root_config(#{}), + ok. + +%% Hocon does not support placing a fun in a config map so we replace it with a string +wrap_fun(Fun) -> + UniqRef = make_ref(), + UniqRefBin = term_to_binary(UniqRef), + UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)), + ets:insert(fun_table_name(), {UniqRefStr, Fun}), + UniqRefStr. + +unwrap_fun(UniqRefStr) -> + ets:lookup_element(fun_table_name(), UniqRefStr, 2). + +update_root_config(RootConf) -> + emqx_conf:update([bridges_v2], RootConf, #{override_to => cluster}). + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + ok = emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ), + %% at some point during the tests, sometimes `emqx_bridge:list()' + %% returns an empty list, but `emqx:get_config([bridges])' returns + %% a bunch of orphan test bridges... + lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()), + emqx_config:put([bridges], #{}), + ok. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +list_bridges_http_api_v1() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("list bridges (http v1)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v1) result:\n ~p", [Res]), + Res. + +list_bridges_http_api_v2() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]), + ct:pal("list bridges (http v2)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v2) result:\n ~p", [Res]), + Res. + +list_connectors_http() -> + Path = emqx_mgmt_api_test_util:api_path(["connectors"]), + ct:pal("list connectors"), + Res = request(get, Path, _Params = []), + ct:pal("list connectors result:\n ~p", [Res]), + Res. + +get_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + ct:pal("get bridge (http v1) (~p)", [#{name => Name}]), + Res = request(get, Path, _Params = []), + ct:pal("get bridge (http v1) (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +get_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]), + ct:pal("get bridge (http v2) (~p)", [#{name => Name}]), + Res = request(get, Path, _Params = []), + ct:pal("get bridge (http v2) (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +get_connector_http(Name) -> + ConnectorId = emqx_connector_resource:connector_id(con_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]), + ct:pal("get connector (~p)", [#{name => Name, id => ConnectorId}]), + Res = request(get, Path, _Params = []), + ct:pal("get connector (~p) result:\n ~p", [#{name => Name}, Res]), + Res. + +create_bridge_http_api_v1(Opts) -> + Name = maps:get(name, Opts), + Overrides = maps:get(overrides, Opts, #{}), + BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("creating bridge (http v1): ~p", [Params]), + Res = request(post, Path, Params), + ct:pal("bridge create (http v1) result:\n ~p", [Res]), + Res. + +create_bridge_http_api_v2(Opts) -> + Name = maps:get(name, Opts), + Overrides = maps:get(overrides, Opts, #{}), + BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]), + ct:pal("creating bridge (http v2): ~p", [Params]), + Res = request(post, Path, Params), + ct:pal("bridge create (http v2) result:\n ~p", [Res]), + Res. + +delete_bridge_http_api_v1(Opts) -> + Name = maps:get(name, Opts), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + ct:pal("deleting bridge (http v1)"), + Res = request(delete, Path, _Params = []), + ct:pal("bridge delete (http v1) result:\n ~p", [Res]), + Res. + +delete_bridge_http_api_v2(Opts) -> + Name = maps:get(name, Opts), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]), + ct:pal("deleting bridge (http v2)"), + Res = request(delete, Path, _Params = []), + ct:pal("bridge delete (http v2) result:\n ~p", [Res]), + Res. + +enable_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "true"]), + ct:pal("enabling bridge (http v1)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge enable (http v1) result:\n ~p", [Res]), + Res. + +enable_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "true"]), + ct:pal("enabling bridge (http v2)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge enable (http v2) result:\n ~p", [Res]), + Res. + +disable_bridge_http_api_v1(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "false"]), + ct:pal("disabling bridge (http v1)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge disable (http v1) result:\n ~p", [Res]), + Res. + +disable_bridge_http_api_v2(Name) -> + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "false"]), + ct:pal("disabling bridge (http v2)"), + Res = request(put, Path, _Params = []), + ct:pal("bridge disable (http v2) result:\n ~p", [Res]), + Res. + +bridge_operation_http_api_v1(Name, Op0) -> + Op = atom_to_list(Op0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + ct:pal("bridge op ~p (http v1)", [Op]), + Res = request(post, Path, _Params = []), + ct:pal("bridge op ~p (http v1) result:\n ~p", [Op, Res]), + Res. + +bridge_operation_http_api_v2(Name, Op0) -> + Op = atom_to_list(Op0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, Op]), + ct:pal("bridge op ~p (http v2)", [Op]), + Res = request(post, Path, _Params = []), + ct:pal("bridge op ~p (http v2) result:\n ~p", [Op, Res]), + Res. + +bridge_node_operation_http_api_v1(Name, Node0, Op0) -> + Op = atom_to_list(Op0), + Node = atom_to_list(Node0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges", BridgeId, Op]), + ct:pal("bridge node op ~p (http v1)", [{Node, Op}]), + Res = request(post, Path, _Params = []), + ct:pal("bridge node op ~p (http v1) result:\n ~p", [{Node, Op}, Res]), + Res. + +bridge_node_operation_http_api_v2(Name, Node0, Op0) -> + Op = atom_to_list(Op0), + Node = atom_to_list(Node0), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges_v2", BridgeId, Op]), + ct:pal("bridge node op ~p (http v2)", [{Node, Op}]), + Res = request(post, Path, _Params = []), + ct:pal("bridge node op ~p (http v2) result:\n ~p", [{Node, Op}, Res]), + Res. + +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +t_scenario_1(_Config) -> + %% =================================================================================== + %% Pre-conditions + %% =================================================================================== + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()), + %% created in the test case init + ?assertMatch({ok, {{_, 200, _}, _, [#{}]}}, list_connectors_http()), + {ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}} = list_connectors_http(), + + %% =================================================================================== + %% Create a single bridge v2. It should still be listed and functional when using v1 + %% APIs. + %% =================================================================================== + NameA = <<"bridgev2a">>, + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + create_bridge_http_api_v1(#{name => NameA}) + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()), + %% created a new one from the v1 API + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start) + ), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)), + + {ok, {{_, 200, _}, _, #{<<"connector">> := GeneratedConnName}}} = get_bridge_http_api_v2(NameA), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + + %% =================================================================================== + %% TODO: Update the bridge using v1 API. + %% =================================================================================== + + %% =================================================================================== + %% Now create a new bridge_v2 pointing to the same connector. It should no longer be + %% functions via v1 API, nor be listed in it. The new bridge must create a new + %% channel, so that this bridge is no longer considered v1. + %% =================================================================================== + NameB = <<"bridgev2b">>, + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + create_bridge_http_api_v2(#{ + name => NameB, overrides => #{<<"connector">> => GeneratedConnName} + }) + ), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch( + {ok, {{_, 200, _}, _, [#{<<"name">> := _}, #{<<"name">> := _}]}}, list_bridges_http_api_v2() + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameB)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameB}}}, get_bridge_http_api_v2(NameB)), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + + ?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameB)), + ?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameB)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, stop)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, start)), + ?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, restart)), + + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), stop) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), start) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), restart) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)), + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, stop)), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start) + ), + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, node(), start) + ), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)), + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, restart)), + + %% =================================================================================== + %% Delete the 2nd new bridge so it appears again in the V1 API. + %% =================================================================================== + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, + delete_bridge_http_api_v2(#{name => NameB}) + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()), + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}}, + get_connector_http(GeneratedConnName) + ), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + %% =================================================================================== + %% Delete the last bridge using API v1. The generated connector should also be + %% removed. + %% =================================================================================== + ?assertMatch( + {ok, {{_, 204, _}, _, _}}, + delete_bridge_http_api_v1(#{name => NameA}) + ), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()), + %% only the pre-existing one should remain. + ?assertMatch( + {ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}}, + list_connectors_http() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"name">> := PreexistentConnectorName}}}, + get_connector_http(PreexistentConnectorName) + ), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, get_connector_http(GeneratedConnName)), + ?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v1(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v2(NameA)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)), + ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)), + %% TODO: currently, only `start' op is supported by the v2 API. + %% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)), + + ok. diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d006d27c0..f8447fe8d 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -27,6 +27,8 @@ -export([get_response/0, put_request/0, post_request/0]). +-export([connector_type_to_bridge_types/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -143,11 +145,11 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> end. transform_old_style_bridges_to_connector_and_actions_of_type( - {ConnectorType, #{type := {map, name, {ref, ConnectorConfSchemaMod, ConnectorConfSchemaName}}}}, + {ConnectorType, #{type := ?MAP(_Name, ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName))}}, RawConfig ) -> ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName), - BridgeTypes = connector_type_to_bridge_types(ConnectorType), + BridgeTypes = ?MODULE:connector_type_to_bridge_types(ConnectorType), BridgesConfMap = maps:get(<<"bridges">>, RawConfig, #{}), ConnectorsConfMap = maps:get(<<"connectors">>, RawConfig, #{}), BridgeConfigsToTransform1 = @@ -200,7 +202,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type( ). transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) -> - ConnectorFields = fields(connectors), + ConnectorFields = ?MODULE:fields(connectors), NewRawConf = lists:foldl( fun transform_old_style_bridges_to_connector_and_actions_of_type/2, RawConfig, From b420b53075db09a1e02f4d8b7e2f3db6fecfed28 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 31 Oct 2023 12:59:12 -0300 Subject: [PATCH 4/5] fix(bridges_v2): check bridge name length before attempting atom conversion Fixes https://emqx.atlassian.net/browse/EMQX-11289 --- apps/emqx/src/emqx_config_handler.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 6 ++---- .../test/emqx_bridge_v1_compatibility_layer_SUITE.erl | 9 +++++++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index bf11c17e8..d8c014b8e 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -703,7 +703,7 @@ atom(Bin) when is_binary(Bin), size(Bin) > 255 -> erlang:throw( iolist_to_binary( io_lib:format( - "Name is is too long." + "Name is too long." " Please provide a shorter name (<= 255 bytes)." " The name that is too long: \"~s\"", [Bin] diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 28a4a71bc..20e345475 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1092,15 +1092,13 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) -> bridge_v2_conf := NewBridgeV2RawConf } = split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf), - %% TODO should we really create an atom here? - ConnectorNameAtom = binary_to_atom(NewConnectorName), - case emqx_connector:create(ConnectorType, ConnectorNameAtom, NewConnectorRawConf) of + case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of {ok, _} -> case create(BridgeType, BridgeName, NewBridgeV2RawConf) of {ok, _} = Result -> Result; {error, Reason1} -> - case emqx_connector:remove(ConnectorType, ConnectorNameAtom) of + case emqx_connector:remove(ConnectorType, NewConnectorName) of ok -> {error, Reason1}; {error, Reason2} -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index ffea4b1ae..5dd2a0280 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -444,6 +444,15 @@ bridge_node_operation_http_api_v2(Name, Node0, Op0) -> %% Test cases %%------------------------------------------------------------------------------ +t_name_too_long(_Config) -> + LongName = list_to_binary(lists:duplicate(256, $a)), + ?assertMatch( + {error, + {{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"Name is too long", _/binary>>}}}}, + create_bridge_http_api_v1(#{name => LongName}) + ), + ok. + t_scenario_1(_Config) -> %% =================================================================================== %% Pre-conditions From 45a39d97c6c9a09dcbf5a5fed3693c4753794046 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 31 Oct 2023 16:39:47 -0300 Subject: [PATCH 5/5] fix(bridges_v1): avoid create dangling connectors when updating bridges via api v1 Fixes https://emqx.atlassian.net/browse/EMQX-11291 --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 71 ++++++++++--------- ...qx_bridge_v1_compatibility_layer_SUITE.erl | 33 ++++++--- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 9 +-- .../src/schema/emqx_connector_schema.erl | 45 +++++++----- apps/emqx_utils/src/emqx_utils_maps.erl | 8 ++- 5 files changed, 99 insertions(+), 67 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 20e345475..3747a4671 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1068,21 +1068,29 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> case lookup_conf(BridgeV2Type, BridgeName) of {error, _} -> %% If the bridge v2 does not exist, it is a valid bridge v1 - split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); + PreviousRawConf = undefined, + split_bridge_v1_config_and_create_helper( + BridgeV1Type, BridgeName, RawConf, PreviousRawConf + ); _Conf -> case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], + PreviousRawConf = emqx:get_raw_config( + [?ROOT_KEY, BridgeV2Type, BridgeName], undefined + ), bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps), - split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); + split_bridge_v1_config_and_create_helper( + BridgeV1Type, BridgeName, RawConf, PreviousRawConf + ); false -> %% If the bridge v2 exists, it is not a valid bridge v1 {error, non_compatible_bridge_v2_exists} end end. -split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) -> +split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) -> #{ connector_type := ConnectorType, connector_name := NewConnectorName, @@ -1091,7 +1099,7 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) -> bridge_v2_name := BridgeName, bridge_v2_conf := NewBridgeV2RawConf } = - split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf), + split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf), case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of {ok, _} -> case create(BridgeType, BridgeName, NewBridgeV2RawConf) of @@ -1116,14 +1124,14 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) -> Error end. -split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> +split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) -> %% Create fake global config for the transformation and then call - %% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1 + %% `emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1' BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), ConnectorType = connector_type(BridgeV2Type), - %% Needed so name confligts will ba avoided + %% Needed to avoid name conflicts CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}), - FakeGlobalConfig = #{ + FakeGlobalConfig0 = #{ <<"connectors">> => CurrentConnectorsConfig, <<"bridges">> => #{ bin(BridgeV1Type) => #{ @@ -1131,6 +1139,13 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> } } }, + FakeGlobalConfig = + emqx_utils_maps:put_if( + FakeGlobalConfig0, + bin(?ROOT_KEY), + #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}}, + PreviousRawConf =/= undefined + ), Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2( FakeGlobalConfig ), @@ -1143,34 +1158,21 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> ], Output ), - ConnectorsBefore = - maps:keys( - emqx_utils_maps:deep_get( - [ - <<"connectors">>, - bin(ConnectorType) - ], - FakeGlobalConfig, - #{} - ) - ), - ConnectorsAfter = - maps:keys( - emqx_utils_maps:deep_get( - [ - <<"connectors">>, - bin(ConnectorType) - ], - Output - ) - ), - [NewConnectorName] = ConnectorsAfter -- ConnectorsBefore, + ConnectorName = emqx_utils_maps:deep_get( + [ + bin(?ROOT_KEY), + bin(BridgeV2Type), + bin(BridgeName), + <<"connector">> + ], + Output + ), NewConnectorRawConf = emqx_utils_maps:deep_get( [ <<"connectors">>, bin(ConnectorType), - bin(NewConnectorName) + bin(ConnectorName) ], Output ), @@ -1178,7 +1180,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> NewFakeGlobalConfig = #{ <<"connectors">> => #{ bin(ConnectorType) => #{ - bin(NewConnectorName) => NewConnectorRawConf + bin(ConnectorName) => NewConnectorRawConf } }, <<"bridges_v2">> => #{ @@ -1197,7 +1199,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> _ -> #{ connector_type => ConnectorType, - connector_name => NewConnectorName, + connector_name => ConnectorName, connector_conf => NewConnectorRawConf, bridge_v2_type => BridgeV2Type, bridge_v2_name => BridgeName, @@ -1212,6 +1214,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> bridge_v1_create_dry_run(BridgeType, RawConfig0) -> RawConf = maps:without([<<"name">>], RawConfig0), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), + PreviousRawConf = undefined, #{ connector_type := _ConnectorType, connector_name := _NewConnectorName, @@ -1219,7 +1222,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> bridge_v2_type := BridgeV2Type, bridge_v2_name := _BridgeName, bridge_v2_conf := BridgeV2RawConf - } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), + } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index 5dd2a0280..bfc3eedc5 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -100,12 +100,6 @@ setup_mocks() -> IsBridgeV2TypeFun = fun(Type) -> BridgeV2Type = bridge_type(), BridgeV2TypeBin = bridge_type_bin(), - ct:pal("is_bridge_v2_type mock: ~p", [ - #{ - input_type => Type, - expected => [BridgeV2Type, BridgeV2TypeBin] - } - ]), case Type of BridgeV2Type -> true; BridgeV2TypeBin -> true; @@ -333,7 +327,8 @@ get_connector_http(Name) -> create_bridge_http_api_v1(Opts) -> Name = maps:get(name, Opts), Overrides = maps:get(overrides, Opts, #{}), - BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + BridgeConfig = maps:without([<<"connector">>], BridgeConfig0), Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, Path = emqx_mgmt_api_test_util:api_path(["bridges"]), ct:pal("creating bridge (http v1): ~p", [Params]), @@ -352,6 +347,19 @@ create_bridge_http_api_v2(Opts) -> ct:pal("bridge create (http v2) result:\n ~p", [Res]), Res. +update_bridge_http_api_v1(Opts) -> + Name = maps:get(name, Opts), + BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), + Overrides = maps:get(overrides, Opts, #{}), + BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides), + BridgeConfig = maps:without([<<"connector">>], BridgeConfig0), + Params = BridgeConfig, + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + ct:pal("updating bridge (http v1): ~p", [Params]), + Res = request(put, Path, Params), + ct:pal("bridge update (http v1) result:\n ~p", [Res]), + Res. + delete_bridge_http_api_v1(Opts) -> Name = maps:get(name, Opts), BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name), @@ -515,8 +523,17 @@ t_scenario_1(_Config) -> ), %% =================================================================================== - %% TODO: Update the bridge using v1 API. + %% Update the bridge using v1 API. %% =================================================================================== + ?assertMatch( + {ok, {{_, 200, _}, _, _}}, + update_bridge_http_api_v1(#{name => NameA}) + ), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()), + ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()), + ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)), %% =================================================================================== %% Now create a new bridge_v2 pointing to the same connector. It should no longer be diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index dc5eb01aa..cd7568001 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -222,13 +222,8 @@ encode_payload(State, Selected) -> OrderingKey = render_key(OrderingKeyTemplate, Selected), Attributes = proc_attributes(AttributesTemplate, Selected), Payload0 = #{data => base64:encode(Data)}, - Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0), - put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>). - -put_if(Acc, K, V, true) -> - Acc#{K => V}; -put_if(Acc, _K, _V, false) -> - Acc. + Payload1 = emqx_utils_maps:put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0), + emqx_utils_maps:put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>). -spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary(). render_payload([] = _Template, Selected) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f8447fe8d..4d803fd79 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -71,21 +71,31 @@ has_connector_field(BridgeConf, ConnectorFields) -> ConnectorFields ). -bridge_configs_to_transform(_BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields) -> +bridge_configs_to_transform( + _BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields, _RawConfig +) -> []; -bridge_configs_to_transform(BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields) -> +bridge_configs_to_transform( + BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields, RawConfig +) -> case has_connector_field(BridgeConf, ConnectorFields) of true -> + PreviousRawConfig = + emqx_utils_maps:deep_get( + [<<"bridges_v2">>, to_bin(BridgeType), to_bin(BridgeName)], + RawConfig, + undefined + ), [ - {BridgeType, BridgeName, BridgeConf, ConnectorFields} - | bridge_configs_to_transform(BridgeType, Rest, ConnectorFields) + {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig} + | bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig) ]; false -> - bridge_configs_to_transform(BridgeType, Rest, ConnectorFields) + bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig) end. split_bridge_to_connector_and_action( - {ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields}} + {ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}} ) -> %% Get connector fields from bridge config ConnectorMap = lists:foldl( @@ -122,8 +132,12 @@ split_bridge_to_connector_and_action( BridgeConf, ConnectorFields ), - %% Generate a connector name - ConnectorName = generate_connector_name(ConnectorsMap, BridgeName, 0), + %% Generate a connector name, if needed. Avoid doing so if there was a previous config. + ConnectorName = + case PreviousRawConfig of + #{<<"connector">> := ConnectorName0} -> ConnectorName0; + _ -> generate_connector_name(ConnectorsMap, BridgeName, 0) + end, %% Add connector field to action map ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0), {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. @@ -152,20 +166,17 @@ transform_old_style_bridges_to_connector_and_actions_of_type( BridgeTypes = ?MODULE:connector_type_to_bridge_types(ConnectorType), BridgesConfMap = maps:get(<<"bridges">>, RawConfig, #{}), ConnectorsConfMap = maps:get(<<"connectors">>, RawConfig, #{}), - BridgeConfigsToTransform1 = - lists:foldl( - fun(BridgeType, ToTranformSoFar) -> + BridgeConfigsToTransform = + lists:flatmap( + fun(BridgeType) -> BridgeNameToBridgeMap = maps:get(to_bin(BridgeType), BridgesConfMap, #{}), BridgeNameBridgeConfList = maps:to_list(BridgeNameToBridgeMap), - NewToTransform = bridge_configs_to_transform( - BridgeType, BridgeNameBridgeConfList, ConnectorFields - ), - [NewToTransform, ToTranformSoFar] + bridge_configs_to_transform( + BridgeType, BridgeNameBridgeConfList, ConnectorFields, RawConfig + ) end, - [], BridgeTypes ), - BridgeConfigsToTransform = lists:flatten(BridgeConfigsToTransform1), ConnectorsWithTypeMap = maps:get(to_bin(ConnectorType), ConnectorsConfMap, #{}), BridgeConfigsToTransformWithConnectorConf = lists:zip( lists:duplicate(length(BridgeConfigsToTransform), ConnectorsWithTypeMap), diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index d49c90e53..3945b7201 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -33,7 +33,8 @@ diff_maps/2, best_effort_recursive_sum/3, if_only_to_toggle_enable/2, - update_if_present/3 + update_if_present/3, + put_if/4 ]). -export_type([config_key/0, config_key_path/0]). @@ -303,3 +304,8 @@ update_if_present(Key, Fun, Map) -> _ -> Map end. + +put_if(Acc, K, V, true) -> + Acc#{K => V}; +put_if(Acc, _K, _V, false) -> + Acc.