From 37930f1d3c0ef538a88dc3c156712a06f5e0bbaa Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 21 Dec 2023 19:46:22 +0800 Subject: [PATCH] fix: emqx conf ctl load failed with connectors/actions --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 102 ++++++++++-------- apps/emqx_conf/src/emqx_conf_cli.erl | 17 ++- apps/emqx_connector/src/emqx_connector.erl | 33 +++--- .../test/emqx_mgmt_api_configs_SUITE.erl | 9 +- 4 files changed, 96 insertions(+), 65 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 9436ac0ea..cd6172eda 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -416,7 +416,7 @@ uninstall_bridge_v2( {error, _} -> ok; ok -> - %% Deinstall from connector + %% uninstall from connector ConnectorId = emqx_connector_resource:resource_id( connector_type(BridgeV2Type), ConnectorName ), @@ -869,6 +869,8 @@ config_key_path() -> config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. +pre_config_update(_, {force_update, Conf}, _OldConf) -> + {ok, Conf}; %% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the %% underlying resources. pre_config_update(_, {_Oper, _, _}, undefined) -> @@ -882,55 +884,15 @@ pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) -> operation_to_enable(disable) -> false; operation_to_enable(enable) -> true. +%% A public API that can trigger this is: +%% bin/emqx ctl conf load data/configs/cluster.hocon +post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) -> + do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false}); %% This top level handler will be triggered when the actions path is updated %% with calls to emqx_conf:update([actions], BridgesConf, #{}). %% -%% A public API that can trigger this is: -%% bin/emqx ctl conf load data/configs/cluster.hocon post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> - #{added := Added, removed := Removed, changed := Updated} = - diff_confs(NewConf, OldConf), - %% new and updated bridges must have their connector references validated - UpdatedConfigs = - lists:map( - fun({{Type, BridgeName}, {_Old, New}}) -> - {Type, BridgeName, New} - end, - maps:to_list(Updated) - ), - AddedConfigs = - lists:map( - fun({{Type, BridgeName}, AddedConf}) -> - {Type, BridgeName, AddedConf} - end, - maps:to_list(Added) - ), - ToValidate = UpdatedConfigs ++ AddedConfigs, - case multi_validate_referenced_connectors(ToValidate) of - ok -> - %% The config update will be failed if any task in `perform_bridge_changes` failed. - RemoveFun = fun uninstall_bridge_v2/3, - CreateFun = fun install_bridge_v2/3, - UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> - uninstall_bridge_v2(Type, Name, OldBridgeConf), - install_bridge_v2(Type, Name, Conf) - end, - Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, - #{ - action => CreateFun, - data => Added, - on_exception_fn => fun emqx_bridge_resource:remove/4 - }, - #{action => UpdateFun, data => Updated} - ]), - ok = unload_message_publish_hook(), - ok = load_message_publish_hook(NewConf), - ?tp(bridge_post_config_update_done, #{}), - Result; - {error, Error} -> - {error, Error} - end; + do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true}); post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), @@ -970,6 +932,50 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, {error, Error} end. +do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) -> + #{added := Added, removed := Removed, changed := Updated} = + diff_confs(NewConf, OldConf), + UpdatedConfigs = + lists:map( + fun({{Type, BridgeName}, {_Old, New}}) -> + {Type, BridgeName, New} + end, + maps:to_list(Updated) + ), + AddedConfigs = + lists:map( + fun({{Type, BridgeName}, AddedConf}) -> + {Type, BridgeName, AddedConf} + end, + maps:to_list(Added) + ), + ToValidate = UpdatedConfigs ++ AddedConfigs, + case multi_validate_referenced_connectors(NeedValidate, ToValidate) of + ok -> + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + ok = unload_message_publish_hook(), + ok = load_message_publish_hook(NewConf), + ?tp(bridge_post_config_update_done, #{}), + Result; + {error, Error} -> + {error, Error} + end. + diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( flatten_confs(NewConfs), @@ -1600,7 +1606,9 @@ to_connector(ConnectorNameBin, BridgeType) -> throw(not_found) end. -multi_validate_referenced_connectors(Configs) -> +multi_validate_referenced_connectors(false, _Configs) -> + ok; +multi_validate_referenced_connectors(true, Configs) -> Pipeline = lists:map( fun({Type, BridgeName, #{connector := ConnectorName}}) -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 07f5b034d..f2ced3327 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -286,9 +286,16 @@ update_config_cluster( check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts); + Request = make_request(Key, Merged), + check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts); update_config_cluster(Key, Value, #{mode := replace} = Opts) -> - check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). + Request = make_request(Key, Value), + check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts). + +make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> -> + {force_update, Value}; +make_request(_Key, Value) -> + Value. -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( @@ -305,9 +312,11 @@ update_config_local( check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); update_config_local(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts); + Request = make_request(Key, Merged), + check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts); update_config_local(Key, Value, #{mode := replace} = Opts) -> - check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts). + Request = make_request(Key, Value), + check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Node, Key, {ok, _}, _Conf, Opts) -> diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 30654bb13..dac85273a 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -107,6 +107,8 @@ config_key_path() -> pre_config_update([?ROOT_KEY], RawConf, RawConf) -> {ok, RawConf}; +pre_config_update([?ROOT_KEY], {force_update, NewConf}, RawConf) -> + pre_config_update([?ROOT_KEY], NewConf, RawConf); pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> case multi_validate_connector_names(NewConf) of ok -> @@ -135,23 +137,16 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> operation_to_enable(disable) -> false; operation_to_enable(enable) -> true. +post_config_update([?ROOT_KEY], {force_update, _}, NewConf, OldConf, _AppEnv) -> + #{added := Added, removed := Removed, changed := Updated} = + diff_confs(NewConf, OldConf), + perform_connector_changes(Removed, Added, Updated); post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), case ensure_no_channels(Removed) of ok -> - %% The config update will be failed if any task in `perform_connector_changes` failed. - Result = perform_connector_changes([ - #{action => fun emqx_connector_resource:remove/4, data => Removed}, - #{ - action => fun emqx_connector_resource:create/4, - data => Added, - on_exception_fn => fun emqx_connector_resource:remove/4 - }, - #{action => fun emqx_connector_resource:update/4, data => Updated} - ]), - ?tp(connector_post_config_update_done, #{}), - Result; + perform_connector_changes(Removed, Added, Updated); {error, Error} -> {error, Error} end; @@ -175,6 +170,20 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ?tp(connector_post_config_update_done, #{}), ok. +%% The config update will be failed if any task in `perform_connector_changes` failed. +perform_connector_changes(Removed, Added, Updated) -> + Result = perform_connector_changes([ + #{action => fun emqx_connector_resource:remove/4, data => Removed}, + #{ + action => fun emqx_connector_resource:create/4, + data => Added, + on_exception_fn => fun emqx_connector_resource:remove/4 + }, + #{action => fun emqx_connector_resource:update/4, data => Updated} + ]), + ?tp(connector_post_config_update_done, #{}), + Result. + list() -> maps:fold( fun(Type, NameAndConf, Connectors) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index ed50ecba4..2bfff17a6 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -378,8 +378,13 @@ t_get_configs_in_different_accept(_Config) -> ?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)). t_create_webhook_v1_bridges_api({'init', Config}) -> - application:ensure_all_started(emqx_connector), - application:ensure_all_started(emqx_bridge), + lists:foreach( + fun(App) -> + _ = application:stop(App), + {ok, [App]} = application:ensure_all_started(App) + end, + [emqx_connector, emqx_bridge] + ), Config; t_create_webhook_v1_bridges_api({'end', _}) -> application:stop(emqx_bridge),