perf(emqx_bridge/connector): apply post config bridge/connector changes in parallel

This can greatly improve the performance when many bridges/connectors are being changed,
e.g. when a backup file is being imported.

Fixes: EMQX-11751
This commit is contained in:
Serge Tupchii 2024-01-18 19:04:50 +02:00
parent 54457b7093
commit a6568dec75
5 changed files with 155 additions and 94 deletions

View File

@ -284,15 +284,15 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} = #{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf), diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed.
Result = perform_bridge_changes([ Result = perform_bridge_changes([
#{action => fun emqx_bridge_resource:remove/4, data => Removed}, #{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed},
#{ #{
action => fun emqx_bridge_resource:create/4, action => fun emqx_bridge_resource:create/4,
action_name => create,
data => Added, data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4 on_exception_fn => fun emqx_bridge_resource:remove/4
}, },
#{action => fun emqx_bridge_resource:update/4, data => Updated} #{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated}
]), ]),
ok = unload_hook(), ok = unload_hook(),
ok = load_hook(NewConf), ok = load_hook(NewConf),
@ -534,28 +534,21 @@ convert_certs(BridgesConf) ->
). ).
perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok). perform_bridge_changes(Tasks, []).
perform_bridge_changes([], Result) -> perform_bridge_changes([], Errors) ->
Result; case Errors of
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> [] -> ok;
_ -> {error, Errors}
end;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold( Results = emqx_utils:pmap(
fun fun({{Type, Name}, Conf}) ->
({_Type, _Name}, _Conf, {error, Reason}) -> ResOpts = creation_opts(Conf),
{error, Reason}; Res =
%% for emqx_bridge_resource:update/4 try
({Type, Name}, {OldConf, Conf}, _) -> Action(Type, Name, Conf, ResOpts)
ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, {OldConf, Conf}, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
try Action(Type, Name, Conf, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
catch catch
Kind:Error:Stacktrace -> Kind:Error:Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{
@ -567,13 +560,34 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R
stacktrace => Stacktrace stacktrace => Stacktrace
}), }),
OnException(Type, Name, Conf, ResOpts), OnException(Type, Name, Conf, ResOpts),
erlang:raise(Kind, Error, Stacktrace) {error, Error}
end end,
{{Type, Name}, Res}
end, end,
Result0, maps:to_list(MapConfs),
MapConfs infinity
), ),
perform_bridge_changes(Tasks, Result). Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_bridge_changes(Tasks, Errors).
creation_opts({_OldConf, Conf}) ->
emqx_resource:fetch_creation_opts(Conf);
creation_opts(Conf) ->
emqx_resource:fetch_creation_opts(Conf).
diff_confs(NewConfs, OldConfs) -> diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps( emqx_utils_maps:diff_maps(

View File

@ -1059,7 +1059,6 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when
-> ->
#{added := Added, removed := Removed, changed := Updated} = #{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf), diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed.
RemoveFun = fun(Type, Name, Conf) -> RemoveFun = fun(Type, Name, Conf) ->
uninstall_bridge_v2(ConfRootKey, Type, Name, Conf) uninstall_bridge_v2(ConfRootKey, Type, Name, Conf)
end, end,
@ -1071,13 +1070,14 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when
install_bridge_v2(ConfRootKey, Type, Name, Conf) install_bridge_v2(ConfRootKey, Type, Name, Conf)
end, end,
Result = perform_bridge_changes([ Result = perform_bridge_changes([
#{action => RemoveFun, data => Removed}, #{action => RemoveFun, action_name => remove, data => Removed},
#{ #{
action => CreateFun, action => CreateFun,
action_name => create,
data => Added, data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4 on_exception_fn => fun emqx_bridge_resource:remove/4
}, },
#{action => UpdateFun, data => Updated} #{action => UpdateFun, action_name => update, data => Updated}
]), ]),
reload_message_publish_hook(NewConf), reload_message_publish_hook(NewConf),
?tp(bridge_post_config_update_done, #{}), ?tp(bridge_post_config_update_done, #{}),
@ -1141,26 +1141,20 @@ do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok). perform_bridge_changes(Tasks, []).
perform_bridge_changes([], Result) -> perform_bridge_changes([], Errors) ->
Result; case Errors of
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> [] -> ok;
_ -> {error, Errors}
end;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold( Results = emqx_utils:pmap(
fun fun({{Type, Name}, Conf}) ->
({_Type, _Name}, _Conf, {error, Reason}) -> Res =
{error, Reason}; try
%% for update Action(Type, Name, Conf)
({Type, Name}, {OldConf, Conf}, _) ->
case Action(Type, Name, {OldConf, Conf}) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
try Action(Type, Name, Conf) of
{error, Reason} -> {error, Reason};
Return -> Return
catch catch
Kind:Error:Stacktrace -> Kind:Error:Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{
@ -1172,13 +1166,29 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R
stacktrace => Stacktrace stacktrace => Stacktrace
}), }),
OnException(Type, Name, Conf), OnException(Type, Name, Conf),
erlang:raise(Kind, Error, Stacktrace) {error, Error}
end end,
{{Type, Name}, Res}
end, end,
Result0, maps:to_list(MapConfs),
MapConfs infinity
), ),
perform_bridge_changes(Tasks, Result). Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_bridge_changes(Tasks, Errors).
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),

View File

@ -606,12 +606,22 @@ t_load_no_matching_connector(_Config) ->
}, },
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {post_config_update, _HandlerMod, [
bridge_name := my_test_bridge_update, #{
connector_name := <<"unknown">>, errors := [
bridge_type := _, {
reason := <<"connector_not_found_or_wrong_type">> {_, my_test_bridge_update},
}}}, {error, #{
bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}
}
],
action := update
}
]}},
update_root_config(RootConf0) update_root_config(RootConf0)
), ),
@ -623,12 +633,22 @@ t_load_no_matching_connector(_Config) ->
}, },
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {post_config_update, _HandlerMod, [
bridge_name := my_test_bridge_new, #{
connector_name := <<"unknown">>, errors := [
bridge_type := _, {
reason := <<"connector_not_found_or_wrong_type">> {_, my_test_bridge_new},
}}}, {error, #{
bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}
}
],
action := create
}
]}},
update_root_config(RootConf1) update_root_config(RootConf1)
), ),

View File

@ -169,16 +169,16 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
ok. ok.
%% The config update will be failed if any task in `perform_connector_changes` failed.
perform_connector_changes(Removed, Added, Updated) -> perform_connector_changes(Removed, Added, Updated) ->
Result = perform_connector_changes([ Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed}, #{action => fun emqx_connector_resource:remove/4, action_name => remove, data => Removed},
#{ #{
action => fun emqx_connector_resource:create/4, action => fun emqx_connector_resource:create/4,
action_name => create,
data => Added, data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4 on_exception_fn => fun emqx_connector_resource:remove/4
}, },
#{action => fun emqx_connector_resource:update/4, data => Updated} #{action => fun emqx_connector_resource:update/4, action_name => update, data => Updated}
]), ]),
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
Result. Result.
@ -351,28 +351,21 @@ convert_certs(ConnectorsConf) ->
). ).
perform_connector_changes(Tasks) -> perform_connector_changes(Tasks) ->
perform_connector_changes(Tasks, ok). perform_connector_changes(Tasks, []).
perform_connector_changes([], Result) -> perform_connector_changes([], Errors) ->
Result; case Errors of
perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> [] -> ok;
_ -> {error, Errors}
end;
perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold( Results = emqx_utils:pmap(
fun fun({{Type, Name}, Conf}) ->
({_Type, _Name}, _Conf, {error, Reason}) -> ResOpts = creation_opts(Conf),
{error, Reason}; Res =
%% for emqx_connector_resource:update/4 try
({Type, Name}, {OldConf, Conf}, _) -> Action(Type, Name, Conf, ResOpts)
ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, {OldConf, Conf}, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
try Action(Type, Name, Conf, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
catch catch
Kind:Error:Stacktrace -> Kind:Error:Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{
@ -384,13 +377,34 @@ perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks]
stacktrace => Stacktrace stacktrace => Stacktrace
}), }),
OnException(Type, Name, Conf, ResOpts), OnException(Type, Name, Conf, ResOpts),
erlang:raise(Kind, Error, Stacktrace) {error, Error}
end end,
{{Type, Name}, Res}
end, end,
Result0, maps:to_list(MapConfs),
MapConfs infinity
), ),
perform_connector_changes(Tasks, Result). Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_connector_changes(Tasks, Errors).
creation_opts({_OldConf, Conf}) ->
emqx_resource:fetch_creation_opts(Conf);
creation_opts(Conf) ->
emqx_resource:fetch_creation_opts(Conf).
diff_confs(NewConfs, OldConfs) -> diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps( emqx_utils_maps:diff_maps(

View File

@ -0,0 +1,3 @@
Apply post config bridge changes in parallel.
This can greatly improve the performance when multiple bridges are being changed,
e.g. when a backup file is being imported.