feat(bridge_v2): validate connector references when creating/updating bridges
This commit is contained in:
parent
04a832a80a
commit
f2c9739ce2
|
@ -655,26 +655,47 @@ operation_to_enable(enable) -> true.
|
|||
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
||||
#{added := Added, removed := Removed, changed := Updated} =
|
||||
diff_confs(NewConf, OldConf),
|
||||
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
||||
RemoveFun = fun uninstall_bridge_v2/3,
|
||||
CreateFun = fun install_bridge_v2/3,
|
||||
UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
|
||||
uninstall_bridge_v2(Type, Name, OldBridgeConf),
|
||||
install_bridge_v2(Type, Name, Conf)
|
||||
end,
|
||||
Result = perform_bridge_changes([
|
||||
#{action => RemoveFun, data => Removed},
|
||||
#{
|
||||
action => CreateFun,
|
||||
data => Added,
|
||||
on_exception_fn => fun emqx_bridge_resource:remove/4
|
||||
},
|
||||
#{action => UpdateFun, data => Updated}
|
||||
]),
|
||||
ok = unload_message_publish_hook(),
|
||||
ok = load_message_publish_hook(NewConf),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
Result;
|
||||
%% new and updated bridges must have their connector references validated
|
||||
UpdatedConfigs =
|
||||
lists:map(
|
||||
fun({{Type, BridgeName}, {_Old, New}}) ->
|
||||
{Type, BridgeName, New}
|
||||
end,
|
||||
maps:to_list(Updated)
|
||||
),
|
||||
AddedConfigs =
|
||||
lists:map(
|
||||
fun({{Type, BridgeName}, AddedConf}) ->
|
||||
{Type, BridgeName, AddedConf}
|
||||
end,
|
||||
maps:to_list(Added)
|
||||
),
|
||||
ToValidate = UpdatedConfigs ++ AddedConfigs,
|
||||
case multi_validate_referenced_connectors(ToValidate) of
|
||||
ok ->
|
||||
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
||||
RemoveFun = fun uninstall_bridge_v2/3,
|
||||
CreateFun = fun install_bridge_v2/3,
|
||||
UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
|
||||
uninstall_bridge_v2(Type, Name, OldBridgeConf),
|
||||
install_bridge_v2(Type, Name, Conf)
|
||||
end,
|
||||
Result = perform_bridge_changes([
|
||||
#{action => RemoveFun, data => Removed},
|
||||
#{
|
||||
action => CreateFun,
|
||||
data => Added,
|
||||
on_exception_fn => fun emqx_bridge_resource:remove/4
|
||||
},
|
||||
#{action => UpdateFun, data => Updated}
|
||||
]),
|
||||
ok = unload_message_publish_hook(),
|
||||
ok = load_message_publish_hook(NewConf),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
Result;
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end;
|
||||
post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
|
||||
Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]),
|
||||
ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf),
|
||||
|
@ -683,22 +704,36 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf,
|
|||
?tp(bridge_post_config_update_done, #{}),
|
||||
ok;
|
||||
post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
|
||||
ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
||||
Bridges = emqx_utils_maps:deep_put(
|
||||
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
||||
),
|
||||
reload_message_publish_hook(Bridges),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
ok;
|
||||
%% N.B.: all bridges must use the same field name (`connector`) to define the
|
||||
%% connector name.
|
||||
ConnectorName = maps:get(connector, NewConf),
|
||||
case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
||||
ok ->
|
||||
ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
||||
Bridges = emqx_utils_maps:deep_put(
|
||||
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
||||
),
|
||||
reload_message_publish_hook(Bridges),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
ok;
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end;
|
||||
post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
|
||||
ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
|
||||
ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
||||
Bridges = emqx_utils_maps:deep_put(
|
||||
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
||||
),
|
||||
reload_message_publish_hook(Bridges),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
ok.
|
||||
ConnectorName = maps:get(connector, NewConf),
|
||||
case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
||||
ok ->
|
||||
ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
|
||||
ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
||||
Bridges = emqx_utils_maps:deep_put(
|
||||
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
||||
),
|
||||
reload_message_publish_hook(Bridges),
|
||||
?tp(bridge_post_config_update_done, #{}),
|
||||
ok;
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
diff_confs(NewConfs, OldConfs) ->
|
||||
emqx_utils_maps:diff_maps(
|
||||
|
@ -1088,3 +1123,51 @@ extract_connector_id_from_bridge_v2_id(Id) ->
|
|||
_X ->
|
||||
error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))})
|
||||
end.
|
||||
|
||||
to_existing_atom(X) ->
|
||||
case emqx_utils:safe_to_existing_atom(X, utf8) of
|
||||
{ok, A} -> A;
|
||||
{error, _} -> throw(bad_atom)
|
||||
end.
|
||||
|
||||
validate_referenced_connectors(Type0, ConnectorName0, BridgeName) ->
|
||||
%% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
|
||||
%% identical to its matching connector type name.
|
||||
try
|
||||
Type = to_existing_atom(Type0),
|
||||
ConnectorName = to_existing_atom(ConnectorName0),
|
||||
case emqx_config:get([connectors, Type, ConnectorName], undefined) of
|
||||
undefined ->
|
||||
{error, #{
|
||||
reason => "connector_not_found_or_wrong_type",
|
||||
type => Type,
|
||||
bridge_name => BridgeName,
|
||||
connector_name => ConnectorName
|
||||
}};
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
catch
|
||||
throw:bad_atom ->
|
||||
{error, #{
|
||||
reason => "connector_not_found_or_wrong_type",
|
||||
type => Type0,
|
||||
bridge_name => BridgeName,
|
||||
connector_name => ConnectorName0
|
||||
}}
|
||||
end.
|
||||
|
||||
multi_validate_referenced_connectors(Configs) ->
|
||||
Pipeline =
|
||||
lists:map(
|
||||
fun({Type, BridgeName, #{connector := ConnectorName}}) ->
|
||||
fun(_) -> validate_referenced_connectors(Type, ConnectorName, BridgeName) end
|
||||
end,
|
||||
Configs
|
||||
),
|
||||
case emqx_utils:pipeline(Pipeline, unused, unused) of
|
||||
{ok, _, _} ->
|
||||
ok;
|
||||
{error, Reason, _State} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
|
|
@ -21,11 +21,13 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
con_mod() ->
|
||||
emqx_bridge_v2_test_connector.
|
||||
|
||||
con_type() ->
|
||||
test_connector_type.
|
||||
bridge_type().
|
||||
|
||||
con_name() ->
|
||||
my_connector.
|
||||
|
@ -60,9 +62,13 @@ con_config() ->
|
|||
}.
|
||||
|
||||
bridge_schema() ->
|
||||
bridge_schema(_Opts = #{}).
|
||||
|
||||
bridge_schema(Opts) ->
|
||||
Type = maps:get(bridge_type, Opts, bridge_type()),
|
||||
[
|
||||
{
|
||||
bridge_type(),
|
||||
Type,
|
||||
hoconsc:mk(
|
||||
hoconsc:map(name, typerefl:map()),
|
||||
#{
|
||||
|
@ -94,49 +100,84 @@ all() ->
|
|||
|
||||
start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
%% Setting up mocks for fake connector and bridge V2
|
||||
meck:new(emqx_connector_schema, [passthrough, no_link]),
|
||||
setup_mocks() ->
|
||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||
|
||||
catch meck:new(emqx_connector_schema, MeckOpts),
|
||||
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
|
||||
|
||||
meck:new(emqx_connector_resource, [passthrough, no_link]),
|
||||
catch meck:new(emqx_connector_resource, MeckOpts),
|
||||
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
|
||||
|
||||
meck:new(emqx_bridge_v2_schema, [passthrough, no_link]),
|
||||
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
|
||||
meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
|
||||
|
||||
meck:new(emqx_bridge_v2, [passthrough, no_link]),
|
||||
catch meck:new(emqx_bridge_v2, MeckOpts),
|
||||
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
||||
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
||||
|
||||
_ = application:load(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps(start_apps()),
|
||||
ok.
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
app_specs(),
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
app_specs() ->
|
||||
[
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_connector,
|
||||
emqx_bridge
|
||||
].
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
%% Setting up mocks for fake connector and bridge V2
|
||||
setup_mocks(),
|
||||
ets:new(fun_table_name(), [named_table, public]),
|
||||
%% Create a fake connector
|
||||
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
||||
[
|
||||
{mocked_mods, [
|
||||
emqx_connector_schema,
|
||||
emqx_connector_resource,
|
||||
emqx_bridge_v2_schema,
|
||||
|
||||
emqx_bridge_v2
|
||||
]}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
MockedMods = proplists:get_value(mocked_mods, Config),
|
||||
meck:unload(MockedMods),
|
||||
emqx_common_test_helpers:stop_apps(start_apps()).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
ets:new(fun_table_name(), [named_table, public]),
|
||||
%% Create a fake connector
|
||||
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ets:delete(fun_table_name()),
|
||||
%% Remove the fake connector
|
||||
{ok, _} = emqx_connector:remove(con_type(), con_name()),
|
||||
Config.
|
||||
delete_all_bridges_and_connectors(),
|
||||
meck:unload(),
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
ok.
|
||||
|
||||
delete_all_bridges_and_connectors() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
ct:pal("removing bridge ~p", [{Type, Name}]),
|
||||
emqx_bridge_v2:remove(Type, Name)
|
||||
end,
|
||||
emqx_bridge_v2:list()
|
||||
),
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
ct:pal("removing connector ~p", [{Type, Name}]),
|
||||
emqx_connector:remove(Type, Name)
|
||||
end,
|
||||
emqx_connector:list()
|
||||
),
|
||||
emqx_conf:update([bridges_v2], #{}, #{override_to => cluster}),
|
||||
ok.
|
||||
|
||||
%% Hocon does not support placing a fun in a config map so we replace it with a string
|
||||
|
||||
|
@ -381,3 +422,94 @@ get_bridge_v2_alarm_cnt() ->
|
|||
(_) -> false
|
||||
end,
|
||||
length(lists:filter(FilterFun, Alarms)).
|
||||
|
||||
t_load_no_matching_connector(_Config) ->
|
||||
Conf = bridge_config(),
|
||||
BridgeTypeBin = atom_to_binary(bridge_type()),
|
||||
BridgeNameBin0 = <<"my_test_bridge_update">>,
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeNameBin0, Conf)),
|
||||
|
||||
%% updating to invalid reference
|
||||
RootConf0 = #{
|
||||
BridgeTypeBin =>
|
||||
#{BridgeNameBin0 => Conf#{<<"connector">> := <<"unknown">>}}
|
||||
},
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
bridge_name := my_test_bridge_update,
|
||||
connector_name := unknown,
|
||||
type := _,
|
||||
reason := "connector_not_found_or_wrong_type"
|
||||
}}},
|
||||
emqx_conf:update([bridges_v2], RootConf0, #{override_to => cluster})
|
||||
),
|
||||
|
||||
%% creating new with invalid reference
|
||||
BridgeNameBin1 = <<"my_test_bridge_new">>,
|
||||
RootConf1 = #{
|
||||
BridgeTypeBin =>
|
||||
#{BridgeNameBin1 => Conf#{<<"connector">> := <<"unknown">>}}
|
||||
},
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
bridge_name := my_test_bridge_new,
|
||||
connector_name := unknown,
|
||||
type := _,
|
||||
reason := "connector_not_found_or_wrong_type"
|
||||
}}},
|
||||
emqx_conf:update([bridges_v2], RootConf1, #{override_to => cluster})
|
||||
),
|
||||
|
||||
ok.
|
||||
|
||||
t_create_no_matching_connector(_Config) ->
|
||||
Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>},
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
bridge_name := _,
|
||||
connector_name := _,
|
||||
type := _,
|
||||
reason := "connector_not_found_or_wrong_type"
|
||||
}}},
|
||||
emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_create_wrong_connector_type(_Config) ->
|
||||
meck:expect(
|
||||
emqx_bridge_v2_schema,
|
||||
fields,
|
||||
1,
|
||||
bridge_schema(#{bridge_type => wrong_type})
|
||||
),
|
||||
Conf = bridge_config(),
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
bridge_name := _,
|
||||
connector_name := _,
|
||||
type := wrong_type,
|
||||
reason := "connector_not_found_or_wrong_type"
|
||||
}}},
|
||||
emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_update_connector_not_found(_Config) ->
|
||||
Conf = bridge_config(),
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
|
||||
BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>},
|
||||
?assertMatch(
|
||||
{error,
|
||||
{post_config_update, _HandlerMod, #{
|
||||
bridge_name := _,
|
||||
connector_name := _,
|
||||
type := _,
|
||||
reason := "connector_not_found_or_wrong_type"
|
||||
}}},
|
||||
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
|
||||
),
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue