From a6568dec758653c330ec1648c7055b43cee0618b Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 18 Jan 2024 19:04:50 +0200 Subject: [PATCH] perf(emqx_bridge/connector): apply post config bridge/connector changes in parallel This can greatly improve the performance when many bridges/connectors are being changed, e.g. when a backup file is being imported. Fixes: EMQX-11751 --- apps/emqx_bridge/src/emqx_bridge.erl | 70 +++++++++++-------- apps/emqx_bridge/src/emqx_bridge_v2.erl | 62 +++++++++------- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 44 ++++++++---- apps/emqx_connector/src/emqx_connector.erl | 70 +++++++++++-------- changes/ce/perf-12354.en.md | 3 + 5 files changed, 155 insertions(+), 94 deletions(-) create mode 100644 changes/ce/perf-12354.en.md diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index e27748610..3cf9a199a 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -284,15 +284,15 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - #{action => fun emqx_bridge_resource:remove/4, data => Removed}, + #{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed}, #{ action => fun emqx_bridge_resource:create/4, + action_name => create, data => Added, on_exception_fn => fun emqx_bridge_resource:remove/4 }, - #{action => fun emqx_bridge_resource:update/4, data => Updated} + #{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -534,28 +534,21 @@ convert_certs(BridgesConf) -> ). perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). + perform_bridge_changes(Tasks, []). -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_bridge_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for emqx_bridge_resource:update/4 - ({Type, Name}, {OldConf, Conf}, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, {OldConf, Conf}, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - try Action(Type, Name, Conf, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + ResOpts = creation_opts(Conf), + Res = + try + Action(Type, Name, Conf, ResOpts) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -567,13 +560,34 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R stacktrace => Stacktrace }), OnException(Type, Name, Conf, ResOpts), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_bridge_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_bridge_changes(Tasks, Errors). + +creation_opts({_OldConf, Conf}) -> + emqx_resource:fetch_creation_opts(Conf); +creation_opts(Conf) -> + emqx_resource:fetch_creation_opts(Conf). diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index b69882080..522f0aca8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1059,7 +1059,6 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. RemoveFun = fun(Type, Name, Conf) -> uninstall_bridge_v2(ConfRootKey, Type, Name, Conf) end, @@ -1071,13 +1070,14 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when install_bridge_v2(ConfRootKey, Type, Name, Conf) end, Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, + #{action => RemoveFun, action_name => remove, data => Removed}, #{ action => CreateFun, + action_name => create, data => Added, on_exception_fn => fun emqx_bridge_resource:remove/4 }, - #{action => UpdateFun, data => Updated} + #{action => UpdateFun, action_name => update, data => Updated} ]), reload_message_publish_hook(NewConf), ?tp(bridge_post_config_update_done, #{}), @@ -1141,26 +1141,20 @@ do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). + perform_bridge_changes(Tasks, []). -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_bridge_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for update - ({Type, Name}, {OldConf, Conf}, _) -> - case Action(Type, Name, {OldConf, Conf}) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - try Action(Type, Name, Conf) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + Res = + try + Action(Type, Name, Conf) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -1172,13 +1166,29 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R stacktrace => Stacktrace }), OnException(Type, Name, Conf), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_bridge_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_bridge_changes(Tasks, Errors). fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index f3b8a29d7..ba631f71a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -606,12 +606,22 @@ t_load_no_matching_connector(_Config) -> }, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ - bridge_name := my_test_bridge_update, - connector_name := <<"unknown">>, - bridge_type := _, - reason := <<"connector_not_found_or_wrong_type">> - }}}, + {post_config_update, _HandlerMod, [ + #{ + errors := [ + { + {_, my_test_bridge_update}, + {error, #{ + bridge_name := my_test_bridge_update, + connector_name := <<"unknown">>, + bridge_type := _, + reason := <<"connector_not_found_or_wrong_type">> + }} + } + ], + action := update + } + ]}}, update_root_config(RootConf0) ), @@ -623,12 +633,22 @@ t_load_no_matching_connector(_Config) -> }, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ - bridge_name := my_test_bridge_new, - connector_name := <<"unknown">>, - bridge_type := _, - reason := <<"connector_not_found_or_wrong_type">> - }}}, + {post_config_update, _HandlerMod, [ + #{ + errors := [ + { + {_, my_test_bridge_new}, + {error, #{ + bridge_name := my_test_bridge_new, + connector_name := <<"unknown">>, + bridge_type := _, + reason := <<"connector_not_found_or_wrong_type">> + }} + } + ], + action := create + } + ]}}, update_root_config(RootConf1) ), diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 92cf9439e..ba2e3106f 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -169,16 +169,16 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ?tp(connector_post_config_update_done, #{}), ok. -%% The config update will be failed if any task in `perform_connector_changes` failed. perform_connector_changes(Removed, Added, Updated) -> Result = perform_connector_changes([ - #{action => fun emqx_connector_resource:remove/4, data => Removed}, + #{action => fun emqx_connector_resource:remove/4, action_name => remove, data => Removed}, #{ action => fun emqx_connector_resource:create/4, + action_name => create, data => Added, on_exception_fn => fun emqx_connector_resource:remove/4 }, - #{action => fun emqx_connector_resource:update/4, data => Updated} + #{action => fun emqx_connector_resource:update/4, action_name => update, data => Updated} ]), ?tp(connector_post_config_update_done, #{}), Result. @@ -351,28 +351,21 @@ convert_certs(ConnectorsConf) -> ). perform_connector_changes(Tasks) -> - perform_connector_changes(Tasks, ok). + perform_connector_changes(Tasks, []). -perform_connector_changes([], Result) -> - Result; -perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_connector_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for emqx_connector_resource:update/4 - ({Type, Name}, {OldConf, Conf}, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, {OldConf, Conf}, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - try Action(Type, Name, Conf, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + ResOpts = creation_opts(Conf), + Res = + try + Action(Type, Name, Conf, ResOpts) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -384,13 +377,34 @@ perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks] stacktrace => Stacktrace }), OnException(Type, Name, Conf, ResOpts), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_connector_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_connector_changes(Tasks, Errors). + +creation_opts({_OldConf, Conf}) -> + emqx_resource:fetch_creation_opts(Conf); +creation_opts(Conf) -> + emqx_resource:fetch_creation_opts(Conf). diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( diff --git a/changes/ce/perf-12354.en.md b/changes/ce/perf-12354.en.md new file mode 100644 index 000000000..ac0be69a0 --- /dev/null +++ b/changes/ce/perf-12354.en.md @@ -0,0 +1,3 @@ +Apply post config bridge changes in parallel. +This can greatly improve the performance when multiple bridges are being changed, +e.g. when a backup file is being imported.