Merge pull request #12214 from zhongwencool/update-actions-connectors

fix: emqx conf ctl load failed with connectors/actions
This commit is contained in:
Zaiming (Stone) Shi 2023-12-21 17:39:44 +01:00 committed by GitHub
commit d9b3280f30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 65 deletions

View File

@ -416,7 +416,7 @@ uninstall_bridge_v2(
{error, _} ->
ok;
ok ->
%% Deinstall from connector
%% uninstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
),
@ -869,6 +869,8 @@ config_key_path() ->
config_key_path_leaf() ->
[?ROOT_KEY, '?', '?'].
pre_config_update(_, {force_update, Conf}, _OldConf) ->
{ok, Conf};
%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the
%% underlying resources.
pre_config_update(_, {_Oper, _, _}, undefined) ->
@ -882,55 +884,15 @@ pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) ->
operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true.
%% A public API that can trigger this is:
%% bin/emqx ctl conf load data/configs/cluster.hocon
post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) ->
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false});
%% This top level handler will be triggered when the actions path is updated
%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
%%
%% A public API that can trigger this is:
%% bin/emqx ctl conf load data/configs/cluster.hocon
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
%% new and updated bridges must have their connector references validated
UpdatedConfigs =
lists:map(
fun({{Type, BridgeName}, {_Old, New}}) ->
{Type, BridgeName, New}
end,
maps:to_list(Updated)
),
AddedConfigs =
lists:map(
fun({{Type, BridgeName}, AddedConf}) ->
{Type, BridgeName, AddedConf}
end,
maps:to_list(Added)
),
ToValidate = UpdatedConfigs ++ AddedConfigs,
case multi_validate_referenced_connectors(ToValidate) of
ok ->
%% The config update will be failed if any task in `perform_bridge_changes` failed.
RemoveFun = fun uninstall_bridge_v2/3,
CreateFun = fun install_bridge_v2/3,
UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
uninstall_bridge_v2(Type, Name, OldBridgeConf),
install_bridge_v2(Type, Name, Conf)
end,
Result = perform_bridge_changes([
#{action => RemoveFun, data => Removed},
#{
action => CreateFun,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => UpdateFun, data => Updated}
]),
ok = unload_message_publish_hook(),
ok = load_message_publish_hook(NewConf),
?tp(bridge_post_config_update_done, #{}),
Result;
{error, Error} ->
{error, Error}
end;
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true});
post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]),
ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf),
@ -970,6 +932,50 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf,
{error, Error}
end.
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
UpdatedConfigs =
lists:map(
fun({{Type, BridgeName}, {_Old, New}}) ->
{Type, BridgeName, New}
end,
maps:to_list(Updated)
),
AddedConfigs =
lists:map(
fun({{Type, BridgeName}, AddedConf}) ->
{Type, BridgeName, AddedConf}
end,
maps:to_list(Added)
),
ToValidate = UpdatedConfigs ++ AddedConfigs,
case multi_validate_referenced_connectors(NeedValidate, ToValidate) of
ok ->
%% The config update will be failed if any task in `perform_bridge_changes` failed.
RemoveFun = fun uninstall_bridge_v2/3,
CreateFun = fun install_bridge_v2/3,
UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
uninstall_bridge_v2(Type, Name, OldBridgeConf),
install_bridge_v2(Type, Name, Conf)
end,
Result = perform_bridge_changes([
#{action => RemoveFun, data => Removed},
#{
action => CreateFun,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => UpdateFun, data => Updated}
]),
ok = unload_message_publish_hook(),
ok = load_message_publish_hook(NewConf),
?tp(bridge_post_config_update_done, #{}),
Result;
{error, Error} ->
{error, Error}
end.
diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps(
flatten_confs(NewConfs),
@ -1600,7 +1606,9 @@ to_connector(ConnectorNameBin, BridgeType) ->
throw(not_found)
end.
multi_validate_referenced_connectors(Configs) ->
multi_validate_referenced_connectors(false, _Configs) ->
ok;
multi_validate_referenced_connectors(true, Configs) ->
Pipeline =
lists:map(
fun({Type, BridgeName, #{connector := ConnectorName}}) ->

View File

@ -286,9 +286,16 @@ update_config_cluster(
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf),
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
Request = make_request(Key, Merged),
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts);
update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
Request = make_request(Key, Value),
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts).
make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> ->
{force_update, Value};
make_request(_Key, Value) ->
Value.
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
update_config_local(
@ -305,9 +312,11 @@ update_config_local(
check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts);
update_config_local(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf),
check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts);
Request = make_request(Key, Merged),
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts);
update_config_local(Key, Value, #{mode := replace} = Opts) ->
check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts).
Request = make_request(Key, Value),
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts).
check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts).
check_res(Node, Key, {ok, _}, _Conf, Opts) ->

View File

@ -107,6 +107,8 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], {force_update, NewConf}, RawConf) ->
pre_config_update([?ROOT_KEY], NewConf, RawConf);
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
case multi_validate_connector_names(NewConf) of
ok ->
@ -135,23 +137,16 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true.
post_config_update([?ROOT_KEY], {force_update, _}, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
perform_connector_changes(Removed, Added, Updated);
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
case ensure_no_channels(Removed) of
ok ->
%% The config update will be failed if any task in `perform_connector_changes` failed.
Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed},
#{
action => fun emqx_connector_resource:create/4,
data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4
},
#{action => fun emqx_connector_resource:update/4, data => Updated}
]),
?tp(connector_post_config_update_done, #{}),
Result;
perform_connector_changes(Removed, Added, Updated);
{error, Error} ->
{error, Error}
end;
@ -175,6 +170,20 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
?tp(connector_post_config_update_done, #{}),
ok.
%% The config update will be failed if any task in `perform_connector_changes` failed.
perform_connector_changes(Removed, Added, Updated) ->
Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed},
#{
action => fun emqx_connector_resource:create/4,
data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4
},
#{action => fun emqx_connector_resource:update/4, data => Updated}
]),
?tp(connector_post_config_update_done, #{}),
Result.
list() ->
maps:fold(
fun(Type, NameAndConf, Connectors) ->

View File

@ -378,8 +378,13 @@ t_get_configs_in_different_accept(_Config) ->
?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)).
t_create_webhook_v1_bridges_api({'init', Config}) ->
application:ensure_all_started(emqx_connector),
application:ensure_all_started(emqx_bridge),
lists:foreach(
fun(App) ->
_ = application:stop(App),
{ok, [App]} = application:ensure_all_started(App)
end,
[emqx_connector, emqx_bridge]
),
Config;
t_create_webhook_v1_bridges_api({'end', _}) ->
application:stop(emqx_bridge),