diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 9f5600cfa..9e03a7d80 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -655,26 +655,47 @@ operation_to_enable(enable) -> true. post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. - RemoveFun = fun uninstall_bridge_v2/3, - CreateFun = fun install_bridge_v2/3, - UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> - uninstall_bridge_v2(Type, Name, OldBridgeConf), - install_bridge_v2(Type, Name, Conf) - end, - Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, - #{ - action => CreateFun, - data => Added, - on_exception_fn => fun emqx_bridge_resource:remove/4 - }, - #{action => UpdateFun, data => Updated} - ]), - ok = unload_message_publish_hook(), - ok = load_message_publish_hook(NewConf), - ?tp(bridge_post_config_update_done, #{}), - Result; + %% new and updated bridges must have their connector references validated + UpdatedConfigs = + lists:map( + fun({{Type, BridgeName}, {_Old, New}}) -> + {Type, BridgeName, New} + end, + maps:to_list(Updated) + ), + AddedConfigs = + lists:map( + fun({{Type, BridgeName}, AddedConf}) -> + {Type, BridgeName, AddedConf} + end, + maps:to_list(Added) + ), + ToValidate = UpdatedConfigs ++ AddedConfigs, + case multi_validate_referenced_connectors(ToValidate) of + ok -> + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + ok = unload_message_publish_hook(), + ok = load_message_publish_hook(NewConf), + ?tp(bridge_post_config_update_done, #{}), + Result; + {error, Error} -> + {error, Error} + end; post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), @@ -683,22 +704,36 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, ?tp(bridge_post_config_update_done, #{}), ok; post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; + %% N.B.: all bridges must use the same field name (`connector`) to define the + %% connector name. + ConnectorName = maps:get(connector, NewConf), + case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of + ok -> + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; + {error, Error} -> + {error, Error} + end; post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> - ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok. + ConnectorName = maps:get(connector, NewConf), + case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of + ok -> + ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; + {error, Error} -> + {error, Error} + end. diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( @@ -1088,3 +1123,51 @@ extract_connector_id_from_bridge_v2_id(Id) -> _X -> error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))}) end. + +to_existing_atom(X) -> + case emqx_utils:safe_to_existing_atom(X, utf8) of + {ok, A} -> A; + {error, _} -> throw(bad_atom) + end. + +validate_referenced_connectors(Type0, ConnectorName0, BridgeName) -> + %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is + %% identical to its matching connector type name. + try + Type = to_existing_atom(Type0), + ConnectorName = to_existing_atom(ConnectorName0), + case emqx_config:get([connectors, Type, ConnectorName], undefined) of + undefined -> + {error, #{ + reason => "connector_not_found_or_wrong_type", + type => Type, + bridge_name => BridgeName, + connector_name => ConnectorName + }}; + _ -> + ok + end + catch + throw:bad_atom -> + {error, #{ + reason => "connector_not_found_or_wrong_type", + type => Type0, + bridge_name => BridgeName, + connector_name => ConnectorName0 + }} + end. + +multi_validate_referenced_connectors(Configs) -> + Pipeline = + lists:map( + fun({Type, BridgeName, #{connector := ConnectorName}}) -> + fun(_) -> validate_referenced_connectors(Type, ConnectorName, BridgeName) end + end, + Configs + ), + case emqx_utils:pipeline(Pipeline, unused, unused) of + {ok, _, _} -> + ok; + {error, Reason, _State} -> + {error, Reason} + end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index cc0a4f18a..71e9fe65c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -21,11 +21,13 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + con_mod() -> emqx_bridge_v2_test_connector. con_type() -> - test_connector_type. + bridge_type(). con_name() -> my_connector. @@ -60,9 +62,13 @@ con_config() -> }. bridge_schema() -> + bridge_schema(_Opts = #{}). + +bridge_schema(Opts) -> + Type = maps:get(bridge_type, Opts, bridge_type()), [ { - bridge_type(), + Type, hoconsc:mk( hoconsc:map(name, typerefl:map()), #{ @@ -94,49 +100,84 @@ all() -> start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge]. -init_per_suite(Config) -> - %% Setting up mocks for fake connector and bridge V2 - meck:new(emqx_connector_schema, [passthrough, no_link]), +setup_mocks() -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + + catch meck:new(emqx_connector_schema, MeckOpts), meck:expect(emqx_connector_schema, fields, 1, con_schema()), - meck:new(emqx_connector_resource, [passthrough, no_link]), + catch meck:new(emqx_connector_resource, MeckOpts), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), - meck:new(emqx_bridge_v2_schema, [passthrough, no_link]), + catch meck:new(emqx_bridge_v2_schema, MeckOpts), meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()), - meck:new(emqx_bridge_v2, [passthrough, no_link]), + catch meck:new(emqx_bridge_v2, MeckOpts), meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), - _ = application:load(emqx_conf), - ok = emqx_common_test_helpers:start_apps(start_apps()), + ok. + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +app_specs() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge + ]. + +init_per_testcase(_TestCase, Config) -> + %% Setting up mocks for fake connector and bridge V2 + setup_mocks(), + ets:new(fun_table_name(), [named_table, public]), + %% Create a fake connector + {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), [ {mocked_mods, [ emqx_connector_schema, emqx_connector_resource, - emqx_bridge_v2_schema, + emqx_bridge_v2 ]} | Config ]. -end_per_suite(Config) -> - MockedMods = proplists:get_value(mocked_mods, Config), - meck:unload(MockedMods), - emqx_common_test_helpers:stop_apps(start_apps()). - -init_per_testcase(_TestCase, Config) -> - ets:new(fun_table_name(), [named_table, public]), - %% Create a fake connector - {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), - Config. - -end_per_testcase(_TestCase, Config) -> +end_per_testcase(_TestCase, _Config) -> ets:delete(fun_table_name()), - %% Remove the fake connector - {ok, _} = emqx_connector:remove(con_type(), con_name()), - Config. + delete_all_bridges_and_connectors(), + meck:unload(), + emqx_common_test_helpers:call_janitor(), + ok. + +delete_all_bridges_and_connectors() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing bridge ~p", [{Type, Name}]), + emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + lists:foreach( + fun(#{name := Name, type := Type}) -> + ct:pal("removing connector ~p", [{Type, Name}]), + emqx_connector:remove(Type, Name) + end, + emqx_connector:list() + ), + emqx_conf:update([bridges_v2], #{}, #{override_to => cluster}), + ok. %% Hocon does not support placing a fun in a config map so we replace it with a string @@ -381,3 +422,94 @@ get_bridge_v2_alarm_cnt() -> (_) -> false end, length(lists:filter(FilterFun, Alarms)). + +t_load_no_matching_connector(_Config) -> + Conf = bridge_config(), + BridgeTypeBin = atom_to_binary(bridge_type()), + BridgeNameBin0 = <<"my_test_bridge_update">>, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeNameBin0, Conf)), + + %% updating to invalid reference + RootConf0 = #{ + BridgeTypeBin => + #{BridgeNameBin0 => Conf#{<<"connector">> := <<"unknown">>}} + }, + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + bridge_name := my_test_bridge_update, + connector_name := unknown, + type := _, + reason := "connector_not_found_or_wrong_type" + }}}, + emqx_conf:update([bridges_v2], RootConf0, #{override_to => cluster}) + ), + + %% creating new with invalid reference + BridgeNameBin1 = <<"my_test_bridge_new">>, + RootConf1 = #{ + BridgeTypeBin => + #{BridgeNameBin1 => Conf#{<<"connector">> := <<"unknown">>}} + }, + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + bridge_name := my_test_bridge_new, + connector_name := unknown, + type := _, + reason := "connector_not_found_or_wrong_type" + }}}, + emqx_conf:update([bridges_v2], RootConf1, #{override_to => cluster}) + ), + + ok. + +t_create_no_matching_connector(_Config) -> + Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>}, + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + bridge_name := _, + connector_name := _, + type := _, + reason := "connector_not_found_or_wrong_type" + }}}, + emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) + ), + ok. + +t_create_wrong_connector_type(_Config) -> + meck:expect( + emqx_bridge_v2_schema, + fields, + 1, + bridge_schema(#{bridge_type => wrong_type}) + ), + Conf = bridge_config(), + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + bridge_name := _, + connector_name := _, + type := wrong_type, + reason := "connector_not_found_or_wrong_type" + }}}, + emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) + ), + ok. + +t_update_connector_not_found(_Config) -> + Conf = bridge_config(), + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>}, + ?assertMatch( + {error, + {post_config_update, _HandlerMod, #{ + bridge_name := _, + connector_name := _, + type := _, + reason := "connector_not_found_or_wrong_type" + }}}, + emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) + ), + ok.