diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index e27748610..f48a44df2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -103,33 +103,37 @@ load() -> Bridges = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, Conf}) -> %% fetch opts for `emqx_resource_buffer_worker` ResOpts = emqx_resource:fetch_creation_opts(Conf), safe_load_bridge(Type, Name, Conf, ResOpts) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Bridges) + maps:to_list(Bridges), + infinity ). unload() -> unload_hook(), Bridges = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, _Conf}) -> _ = emqx_bridge_resource:stop(Type, Name) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Bridges) + maps:to_list(Bridges), + infinity ). safe_load_bridge(Type, Name, Conf, Opts) -> @@ -284,15 +288,15 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - #{action => fun emqx_bridge_resource:remove/4, data => Removed}, + #{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed}, #{ action => fun emqx_bridge_resource:create/4, + action_name => create, data => Added, on_exception_fn => fun emqx_bridge_resource:remove/4 }, - #{action => fun emqx_bridge_resource:update/4, data => Updated} + #{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -534,28 +538,21 @@ convert_certs(BridgesConf) -> ). perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). + perform_bridge_changes(Tasks, []). -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_bridge_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for emqx_bridge_resource:update/4 - ({Type, Name}, {OldConf, Conf}, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, {OldConf, Conf}, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - try Action(Type, Name, Conf, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + ResOpts = creation_opts(Conf), + Res = + try + Action(Type, Name, Conf, ResOpts) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -567,13 +564,34 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R stacktrace => Stacktrace }), OnException(Type, Name, Conf, ResOpts), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_bridge_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_bridge_changes(Tasks, Errors). + +creation_opts({_OldConf, Conf}) -> + emqx_resource:fetch_creation_opts(Conf); +creation_opts(Conf) -> + emqx_resource:fetch_creation_opts(Conf). diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index b69882080..622dbf464 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -182,17 +182,20 @@ load() -> load_bridges(RootName) -> Bridges = emqx:get_config([RootName], #{}), - lists:foreach( + _ = emqx_utils:pmap( fun({Type, Bridge}) -> - lists:foreach( + emqx_utils:pmap( fun({Name, BridgeConf}) -> install_bridge_v2(RootName, Type, Name, BridgeConf) end, - maps:to_list(Bridge) + maps:to_list(Bridge), + infinity ) end, - maps:to_list(Bridges) - ). + maps:to_list(Bridges), + infinity + ), + ok. unload() -> unload_bridges(?ROOT_KEY_ACTIONS), @@ -204,17 +207,20 @@ unload() -> unload_bridges(ConfRooKey) -> Bridges = emqx:get_config([ConfRooKey], #{}), - lists:foreach( + _ = emqx_utils:pmap( fun({Type, Bridge}) -> - lists:foreach( + emqx_utils:pmap( fun({Name, BridgeConf}) -> uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf) end, - maps:to_list(Bridge) + maps:to_list(Bridge), + infinity ) end, - maps:to_list(Bridges) - ). + maps:to_list(Bridges), + infinity + ), + ok. %%==================================================================== %% CRUD API @@ -1059,7 +1065,6 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. RemoveFun = fun(Type, Name, Conf) -> uninstall_bridge_v2(ConfRootKey, Type, Name, Conf) end, @@ -1071,13 +1076,14 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when install_bridge_v2(ConfRootKey, Type, Name, Conf) end, Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, + #{action => RemoveFun, action_name => remove, data => Removed}, #{ action => CreateFun, + action_name => create, data => Added, on_exception_fn => fun emqx_bridge_resource:remove/4 }, - #{action => UpdateFun, data => Updated} + #{action => UpdateFun, action_name => update, data => Updated} ]), reload_message_publish_hook(NewConf), ?tp(bridge_post_config_update_done, #{}), @@ -1141,26 +1147,20 @@ do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). + perform_bridge_changes(Tasks, []). -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_bridge_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for update - ({Type, Name}, {OldConf, Conf}, _) -> - case Action(Type, Name, {OldConf, Conf}) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - try Action(Type, Name, Conf) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + Res = + try + Action(Type, Name, Conf) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -1172,13 +1172,29 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R stacktrace => Stacktrace }), OnException(Type, Name, Conf), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_bridge_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_bridge_changes(Tasks, Errors). fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index f3b8a29d7..ba631f71a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -606,12 +606,22 @@ t_load_no_matching_connector(_Config) -> }, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ - bridge_name := my_test_bridge_update, - connector_name := <<"unknown">>, - bridge_type := _, - reason := <<"connector_not_found_or_wrong_type">> - }}}, + {post_config_update, _HandlerMod, [ + #{ + errors := [ + { + {_, my_test_bridge_update}, + {error, #{ + bridge_name := my_test_bridge_update, + connector_name := <<"unknown">>, + bridge_type := _, + reason := <<"connector_not_found_or_wrong_type">> + }} + } + ], + action := update + } + ]}}, update_root_config(RootConf0) ), @@ -623,12 +633,22 @@ t_load_no_matching_connector(_Config) -> }, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ - bridge_name := my_test_bridge_new, - connector_name := <<"unknown">>, - bridge_type := _, - reason := <<"connector_not_found_or_wrong_type">> - }}}, + {post_config_update, _HandlerMod, [ + #{ + errors := [ + { + {_, my_test_bridge_new}, + {error, #{ + bridge_name := my_test_bridge_new, + connector_name := <<"unknown">>, + bridge_type := _, + reason := <<"connector_not_found_or_wrong_type">> + }} + } + ], + action := create + } + ]}}, update_root_config(RootConf1) ), diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 92cf9439e..f4fee3eac 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -54,30 +54,34 @@ load() -> Connectors = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, Conf}) -> safe_load_connector(Type, Name, Conf) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Connectors) + maps:to_list(Connectors), + infinity ). unload() -> Connectors = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, _Conf}) -> _ = emqx_connector_resource:stop(Type, Name) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Connectors) + maps:to_list(Connectors), + infinity ). safe_load_connector(Type, Name, Conf) -> @@ -169,16 +173,16 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ?tp(connector_post_config_update_done, #{}), ok. -%% The config update will be failed if any task in `perform_connector_changes` failed. perform_connector_changes(Removed, Added, Updated) -> Result = perform_connector_changes([ - #{action => fun emqx_connector_resource:remove/4, data => Removed}, + #{action => fun emqx_connector_resource:remove/4, action_name => remove, data => Removed}, #{ action => fun emqx_connector_resource:create/4, + action_name => create, data => Added, on_exception_fn => fun emqx_connector_resource:remove/4 }, - #{action => fun emqx_connector_resource:update/4, data => Updated} + #{action => fun emqx_connector_resource:update/4, action_name => update, data => Updated} ]), ?tp(connector_post_config_update_done, #{}), Result. @@ -351,28 +355,21 @@ convert_certs(ConnectorsConf) -> ). perform_connector_changes(Tasks) -> - perform_connector_changes(Tasks, ok). + perform_connector_changes(Tasks, []). -perform_connector_changes([], Result) -> - Result; -perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> +perform_connector_changes([], Errors) -> + case Errors of + [] -> ok; + _ -> {error, Errors} + end; +perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) -> OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for emqx_connector_resource:update/4 - ({Type, Name}, {OldConf, Conf}, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, {OldConf, Conf}, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - ResOpts = emqx_resource:fetch_creation_opts(Conf), - try Action(Type, Name, Conf, ResOpts) of - {error, Reason} -> {error, Reason}; - Return -> Return + Results = emqx_utils:pmap( + fun({{Type, Name}, Conf}) -> + ResOpts = creation_opts(Conf), + Res = + try + Action(Type, Name, Conf, ResOpts) catch Kind:Error:Stacktrace -> ?SLOG(error, #{ @@ -384,13 +381,34 @@ perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks] stacktrace => Stacktrace }), OnException(Type, Name, Conf, ResOpts), - erlang:raise(Kind, Error, Stacktrace) - end + {error, Error} + end, + {{Type, Name}, Res} end, - Result0, - MapConfs + maps:to_list(MapConfs), + infinity ), - perform_connector_changes(Tasks, Result). + Errs = lists:filter( + fun + ({_TypeName, {error, _}}) -> true; + (_) -> false + end, + Results + ), + Errors = + case Errs of + [] -> + Errors0; + _ -> + #{action_name := ActionName} = Task, + [#{action => ActionName, errors => Errs} | Errors0] + end, + perform_connector_changes(Tasks, Errors). + +creation_opts({_OldConf, Conf}) -> + emqx_resource:fetch_creation_opts(Conf); +creation_opts(Conf) -> + emqx_resource:fetch_creation_opts(Conf). diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 8d7c622a4..0eeef2e5e 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -51,6 +51,8 @@ gen_id/0, gen_id/1, explain_posix/1, + pforeach/2, + pforeach/3, pmap/2, pmap/3, readable_error_msg/1, @@ -423,6 +425,15 @@ explain_posix(estale) -> "Stale remote file handle"; explain_posix(exdev) -> "Cross-domain link"; explain_posix(NotPosix) -> NotPosix. +-spec pforeach(fun((A) -> term()), list(A)) -> ok. +pforeach(Fun, List) when is_function(Fun, 1), is_list(List) -> + pforeach(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pforeach(fun((A) -> term()), list(A), timeout()) -> ok. +pforeach(Fun, List, Timeout) -> + _ = pmap(Fun, List, Timeout), + ok. + %% @doc Like lists:map/2, only the callback function is evaluated %% concurrently. -spec pmap(fun((A) -> B), list(A)) -> list(B). @@ -431,7 +442,9 @@ pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> -spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). pmap(Fun, List, Timeout) when - is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 + is_function(Fun, 1), + is_list(List), + (is_integer(Timeout) andalso Timeout >= 0 orelse Timeout =:= infinity) -> nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). diff --git a/changes/ce/perf-12354.en.md b/changes/ce/perf-12354.en.md new file mode 100644 index 000000000..ac0be69a0 --- /dev/null +++ b/changes/ce/perf-12354.en.md @@ -0,0 +1,3 @@ +Apply post config bridge changes in parallel. +This can greatly improve the performance when multiple bridges are being changed, +e.g. when a backup file is being imported.