From dc15d37dcc0550296def87887b7c42a077f9e256 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 19 Jan 2024 19:53:01 +0200 Subject: [PATCH] perf(emqx_bridge/connector): load and unload bridges/connectors in parallel This should reduce app start/stop time, when a large number of bridges/connectors are not healthy. --- apps/emqx_bridge/src/emqx_bridge.erl | 20 ++++++++++------- apps/emqx_bridge/src/emqx_bridge_v2.erl | 26 +++++++++++++--------- apps/emqx_connector/src/emqx_connector.erl | 20 ++++++++++------- 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3cf9a199a..f48a44df2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -103,33 +103,37 @@ load() -> Bridges = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, Conf}) -> %% fetch opts for `emqx_resource_buffer_worker` ResOpts = emqx_resource:fetch_creation_opts(Conf), safe_load_bridge(Type, Name, Conf, ResOpts) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Bridges) + maps:to_list(Bridges), + infinity ). unload() -> unload_hook(), Bridges = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, _Conf}) -> _ = emqx_bridge_resource:stop(Type, Name) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Bridges) + maps:to_list(Bridges), + infinity ). safe_load_bridge(Type, Name, Conf, Opts) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 522f0aca8..622dbf464 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -182,17 +182,20 @@ load() -> load_bridges(RootName) -> Bridges = emqx:get_config([RootName], #{}), - lists:foreach( + _ = emqx_utils:pmap( fun({Type, Bridge}) -> - lists:foreach( + emqx_utils:pmap( fun({Name, BridgeConf}) -> install_bridge_v2(RootName, Type, Name, BridgeConf) end, - maps:to_list(Bridge) + maps:to_list(Bridge), + infinity ) end, - maps:to_list(Bridges) - ). + maps:to_list(Bridges), + infinity + ), + ok. unload() -> unload_bridges(?ROOT_KEY_ACTIONS), @@ -204,17 +207,20 @@ unload() -> unload_bridges(ConfRooKey) -> Bridges = emqx:get_config([ConfRooKey], #{}), - lists:foreach( + _ = emqx_utils:pmap( fun({Type, Bridge}) -> - lists:foreach( + emqx_utils:pmap( fun({Name, BridgeConf}) -> uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf) end, - maps:to_list(Bridge) + maps:to_list(Bridge), + infinity ) end, - maps:to_list(Bridges) - ). + maps:to_list(Bridges), + infinity + ), + ok. %%==================================================================== %% CRUD API diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index ba2e3106f..f4fee3eac 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -54,30 +54,34 @@ load() -> Connectors = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, Conf}) -> safe_load_connector(Type, Name, Conf) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Connectors) + maps:to_list(Connectors), + infinity ). unload() -> Connectors = emqx:get_config([?ROOT_KEY], #{}), - lists:foreach( + emqx_utils:pforeach( fun({Type, NamedConf}) -> - lists:foreach( + emqx_utils:pforeach( fun({Name, _Conf}) -> _ = emqx_connector_resource:stop(Type, Name) end, - maps:to_list(NamedConf) + maps:to_list(NamedConf), + infinity ) end, - maps:to_list(Connectors) + maps:to_list(Connectors), + infinity ). safe_load_connector(Type, Name, Conf) ->