perf(emqx_bridge/connector): load and unload bridges/connectors in parallel

This should reduce app start/stop time, when a large number of bridges/connectors are not healthy.
This commit is contained in:
Serge Tupchii 2024-01-19 19:53:01 +02:00
parent d3a6870097
commit dc15d37dcc
3 changed files with 40 additions and 26 deletions

View File

@ -103,33 +103,37 @@
load() -> load() ->
Bridges = emqx:get_config([?ROOT_KEY], #{}), Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach( emqx_utils:pforeach(
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( emqx_utils:pforeach(
fun({Name, Conf}) -> fun({Name, Conf}) ->
%% fetch opts for `emqx_resource_buffer_worker` %% fetch opts for `emqx_resource_buffer_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf), ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts) safe_load_bridge(Type, Name, Conf, ResOpts)
end, end,
maps:to_list(NamedConf) maps:to_list(NamedConf),
infinity
) )
end, end,
maps:to_list(Bridges) maps:to_list(Bridges),
infinity
). ).
unload() -> unload() ->
unload_hook(), unload_hook(),
Bridges = emqx:get_config([?ROOT_KEY], #{}), Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach( emqx_utils:pforeach(
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( emqx_utils:pforeach(
fun({Name, _Conf}) -> fun({Name, _Conf}) ->
_ = emqx_bridge_resource:stop(Type, Name) _ = emqx_bridge_resource:stop(Type, Name)
end, end,
maps:to_list(NamedConf) maps:to_list(NamedConf),
infinity
) )
end, end,
maps:to_list(Bridges) maps:to_list(Bridges),
infinity
). ).
safe_load_bridge(Type, Name, Conf, Opts) -> safe_load_bridge(Type, Name, Conf, Opts) ->

View File

@ -182,17 +182,20 @@ load() ->
load_bridges(RootName) -> load_bridges(RootName) ->
Bridges = emqx:get_config([RootName], #{}), Bridges = emqx:get_config([RootName], #{}),
lists:foreach( _ = emqx_utils:pmap(
fun({Type, Bridge}) -> fun({Type, Bridge}) ->
lists:foreach( emqx_utils:pmap(
fun({Name, BridgeConf}) -> fun({Name, BridgeConf}) ->
install_bridge_v2(RootName, Type, Name, BridgeConf) install_bridge_v2(RootName, Type, Name, BridgeConf)
end, end,
maps:to_list(Bridge) maps:to_list(Bridge),
infinity
) )
end, end,
maps:to_list(Bridges) maps:to_list(Bridges),
). infinity
),
ok.
unload() -> unload() ->
unload_bridges(?ROOT_KEY_ACTIONS), unload_bridges(?ROOT_KEY_ACTIONS),
@ -204,17 +207,20 @@ unload() ->
unload_bridges(ConfRooKey) -> unload_bridges(ConfRooKey) ->
Bridges = emqx:get_config([ConfRooKey], #{}), Bridges = emqx:get_config([ConfRooKey], #{}),
lists:foreach( _ = emqx_utils:pmap(
fun({Type, Bridge}) -> fun({Type, Bridge}) ->
lists:foreach( emqx_utils:pmap(
fun({Name, BridgeConf}) -> fun({Name, BridgeConf}) ->
uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf) uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf)
end, end,
maps:to_list(Bridge) maps:to_list(Bridge),
infinity
) )
end, end,
maps:to_list(Bridges) maps:to_list(Bridges),
). infinity
),
ok.
%%==================================================================== %%====================================================================
%% CRUD API %% CRUD API

View File

@ -54,30 +54,34 @@
load() -> load() ->
Connectors = emqx:get_config([?ROOT_KEY], #{}), Connectors = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach( emqx_utils:pforeach(
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( emqx_utils:pforeach(
fun({Name, Conf}) -> fun({Name, Conf}) ->
safe_load_connector(Type, Name, Conf) safe_load_connector(Type, Name, Conf)
end, end,
maps:to_list(NamedConf) maps:to_list(NamedConf),
infinity
) )
end, end,
maps:to_list(Connectors) maps:to_list(Connectors),
infinity
). ).
unload() -> unload() ->
Connectors = emqx:get_config([?ROOT_KEY], #{}), Connectors = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach( emqx_utils:pforeach(
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( emqx_utils:pforeach(
fun({Name, _Conf}) -> fun({Name, _Conf}) ->
_ = emqx_connector_resource:stop(Type, Name) _ = emqx_connector_resource:stop(Type, Name)
end, end,
maps:to_list(NamedConf) maps:to_list(NamedConf),
infinity
) )
end, end,
maps:to_list(Connectors) maps:to_list(Connectors),
infinity
). ).
safe_load_connector(Type, Name, Conf) -> safe_load_connector(Type, Name, Conf) ->