Merge pull request #12354 from SergeTupchiy/EMQX-11751-dashboard-import-timeout-rel55

Apply brdige post config changes in parallel
This commit is contained in:
JianBo He 2024-01-22 17:24:55 +08:00 committed by GitHub
commit 2706e005ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 209 additions and 121 deletions

View File

@ -103,33 +103,37 @@
load() ->
Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
lists:foreach(
emqx_utils:pforeach(
fun({Name, Conf}) ->
%% fetch opts for `emqx_resource_buffer_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts)
end,
maps:to_list(NamedConf)
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Bridges)
maps:to_list(Bridges),
infinity
).
unload() ->
unload_hook(),
Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
lists:foreach(
emqx_utils:pforeach(
fun({Name, _Conf}) ->
_ = emqx_bridge_resource:stop(Type, Name)
end,
maps:to_list(NamedConf)
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Bridges)
maps:to_list(Bridges),
infinity
).
safe_load_bridge(Type, Name, Conf, Opts) ->
@ -284,15 +288,15 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed.
Result = perform_bridge_changes([
#{action => fun emqx_bridge_resource:remove/4, data => Removed},
#{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed},
#{
action => fun emqx_bridge_resource:create/4,
action_name => create,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => fun emqx_bridge_resource:update/4, data => Updated}
#{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated}
]),
ok = unload_hook(),
ok = load_hook(NewConf),
@ -534,28 +538,21 @@ convert_certs(BridgesConf) ->
).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
perform_bridge_changes(Tasks, []).
perform_bridge_changes([], Result) ->
Result;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) ->
perform_bridge_changes([], Errors) ->
case Errors of
[] -> ok;
_ -> {error, Errors}
end;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold(
fun
({_Type, _Name}, _Conf, {error, Reason}) ->
{error, Reason};
%% for emqx_bridge_resource:update/4
({Type, Name}, {OldConf, Conf}, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, {OldConf, Conf}, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
try Action(Type, Name, Conf, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
Results = emqx_utils:pmap(
fun({{Type, Name}, Conf}) ->
ResOpts = creation_opts(Conf),
Res =
try
Action(Type, Name, Conf, ResOpts)
catch
Kind:Error:Stacktrace ->
?SLOG(error, #{
@ -567,13 +564,34 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R
stacktrace => Stacktrace
}),
OnException(Type, Name, Conf, ResOpts),
erlang:raise(Kind, Error, Stacktrace)
end
{error, Error}
end,
{{Type, Name}, Res}
end,
Result0,
MapConfs
maps:to_list(MapConfs),
infinity
),
perform_bridge_changes(Tasks, Result).
Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_bridge_changes(Tasks, Errors).
creation_opts({_OldConf, Conf}) ->
emqx_resource:fetch_creation_opts(Conf);
creation_opts(Conf) ->
emqx_resource:fetch_creation_opts(Conf).
diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps(

View File

@ -182,17 +182,20 @@ load() ->
load_bridges(RootName) ->
Bridges = emqx:get_config([RootName], #{}),
lists:foreach(
_ = emqx_utils:pmap(
fun({Type, Bridge}) ->
lists:foreach(
emqx_utils:pmap(
fun({Name, BridgeConf}) ->
install_bridge_v2(RootName, Type, Name, BridgeConf)
end,
maps:to_list(Bridge)
maps:to_list(Bridge),
infinity
)
end,
maps:to_list(Bridges)
).
maps:to_list(Bridges),
infinity
),
ok.
unload() ->
unload_bridges(?ROOT_KEY_ACTIONS),
@ -204,17 +207,20 @@ unload() ->
unload_bridges(ConfRooKey) ->
Bridges = emqx:get_config([ConfRooKey], #{}),
lists:foreach(
_ = emqx_utils:pmap(
fun({Type, Bridge}) ->
lists:foreach(
emqx_utils:pmap(
fun({Name, BridgeConf}) ->
uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf)
end,
maps:to_list(Bridge)
maps:to_list(Bridge),
infinity
)
end,
maps:to_list(Bridges)
).
maps:to_list(Bridges),
infinity
),
ok.
%%====================================================================
%% CRUD API
@ -1059,7 +1065,6 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when
->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed.
RemoveFun = fun(Type, Name, Conf) ->
uninstall_bridge_v2(ConfRootKey, Type, Name, Conf)
end,
@ -1071,13 +1076,14 @@ post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when
install_bridge_v2(ConfRootKey, Type, Name, Conf)
end,
Result = perform_bridge_changes([
#{action => RemoveFun, data => Removed},
#{action => RemoveFun, action_name => remove, data => Removed},
#{
action => CreateFun,
action_name => create,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => UpdateFun, data => Updated}
#{action => UpdateFun, action_name => update, data => Updated}
]),
reload_message_publish_hook(NewConf),
?tp(bridge_post_config_update_done, #{}),
@ -1141,26 +1147,20 @@ do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
perform_bridge_changes(Tasks, []).
perform_bridge_changes([], Result) ->
Result;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) ->
perform_bridge_changes([], Errors) ->
case Errors of
[] -> ok;
_ -> {error, Errors}
end;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold(
fun
({_Type, _Name}, _Conf, {error, Reason}) ->
{error, Reason};
%% for update
({Type, Name}, {OldConf, Conf}, _) ->
case Action(Type, Name, {OldConf, Conf}) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
try Action(Type, Name, Conf) of
{error, Reason} -> {error, Reason};
Return -> Return
Results = emqx_utils:pmap(
fun({{Type, Name}, Conf}) ->
Res =
try
Action(Type, Name, Conf)
catch
Kind:Error:Stacktrace ->
?SLOG(error, #{
@ -1172,13 +1172,29 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], R
stacktrace => Stacktrace
}),
OnException(Type, Name, Conf),
erlang:raise(Kind, Error, Stacktrace)
end
{error, Error}
end,
{{Type, Name}, Res}
end,
Result0,
MapConfs
maps:to_list(MapConfs),
infinity
),
perform_bridge_changes(Tasks, Result).
Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_bridge_changes(Tasks, Errors).
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),

View File

@ -606,12 +606,22 @@ t_load_no_matching_connector(_Config) ->
},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}},
{post_config_update, _HandlerMod, [
#{
errors := [
{
{_, my_test_bridge_update},
{error, #{
bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}
}
],
action := update
}
]}},
update_root_config(RootConf0)
),
@ -623,12 +633,22 @@ t_load_no_matching_connector(_Config) ->
},
?assertMatch(
{error,
{post_config_update, _HandlerMod, #{
bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}},
{post_config_update, _HandlerMod, [
#{
errors := [
{
{_, my_test_bridge_new},
{error, #{
bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>,
bridge_type := _,
reason := <<"connector_not_found_or_wrong_type">>
}}
}
],
action := create
}
]}},
update_root_config(RootConf1)
),

View File

@ -54,30 +54,34 @@
load() ->
Connectors = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
lists:foreach(
emqx_utils:pforeach(
fun({Name, Conf}) ->
safe_load_connector(Type, Name, Conf)
end,
maps:to_list(NamedConf)
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Connectors)
maps:to_list(Connectors),
infinity
).
unload() ->
Connectors = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
lists:foreach(
emqx_utils:pforeach(
fun({Name, _Conf}) ->
_ = emqx_connector_resource:stop(Type, Name)
end,
maps:to_list(NamedConf)
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Connectors)
maps:to_list(Connectors),
infinity
).
safe_load_connector(Type, Name, Conf) ->
@ -169,16 +173,16 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
?tp(connector_post_config_update_done, #{}),
ok.
%% The config update will be failed if any task in `perform_connector_changes` failed.
perform_connector_changes(Removed, Added, Updated) ->
Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed},
#{action => fun emqx_connector_resource:remove/4, action_name => remove, data => Removed},
#{
action => fun emqx_connector_resource:create/4,
action_name => create,
data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4
},
#{action => fun emqx_connector_resource:update/4, data => Updated}
#{action => fun emqx_connector_resource:update/4, action_name => update, data => Updated}
]),
?tp(connector_post_config_update_done, #{}),
Result.
@ -351,28 +355,21 @@ convert_certs(ConnectorsConf) ->
).
perform_connector_changes(Tasks) ->
perform_connector_changes(Tasks, ok).
perform_connector_changes(Tasks, []).
perform_connector_changes([], Result) ->
Result;
perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) ->
perform_connector_changes([], Errors) ->
case Errors of
[] -> ok;
_ -> {error, Errors}
end;
perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Result = maps:fold(
fun
({_Type, _Name}, _Conf, {error, Reason}) ->
{error, Reason};
%% for emqx_connector_resource:update/4
({Type, Name}, {OldConf, Conf}, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, {OldConf, Conf}, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
try Action(Type, Name, Conf, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
Results = emqx_utils:pmap(
fun({{Type, Name}, Conf}) ->
ResOpts = creation_opts(Conf),
Res =
try
Action(Type, Name, Conf, ResOpts)
catch
Kind:Error:Stacktrace ->
?SLOG(error, #{
@ -384,13 +381,34 @@ perform_connector_changes([#{action := Action, data := MapConfs} = Task | Tasks]
stacktrace => Stacktrace
}),
OnException(Type, Name, Conf, ResOpts),
erlang:raise(Kind, Error, Stacktrace)
end
{error, Error}
end,
{{Type, Name}, Res}
end,
Result0,
MapConfs
maps:to_list(MapConfs),
infinity
),
perform_connector_changes(Tasks, Result).
Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_connector_changes(Tasks, Errors).
creation_opts({_OldConf, Conf}) ->
emqx_resource:fetch_creation_opts(Conf);
creation_opts(Conf) ->
emqx_resource:fetch_creation_opts(Conf).
diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps(

View File

@ -51,6 +51,8 @@
gen_id/0,
gen_id/1,
explain_posix/1,
pforeach/2,
pforeach/3,
pmap/2,
pmap/3,
readable_error_msg/1,
@ -423,6 +425,15 @@ explain_posix(estale) -> "Stale remote file handle";
explain_posix(exdev) -> "Cross-domain link";
explain_posix(NotPosix) -> NotPosix.
-spec pforeach(fun((A) -> term()), list(A)) -> ok.
pforeach(Fun, List) when is_function(Fun, 1), is_list(List) ->
pforeach(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
-spec pforeach(fun((A) -> term()), list(A), timeout()) -> ok.
pforeach(Fun, List, Timeout) ->
_ = pmap(Fun, List, Timeout),
ok.
%% @doc Like lists:map/2, only the callback function is evaluated
%% concurrently.
-spec pmap(fun((A) -> B), list(A)) -> list(B).
@ -431,7 +442,9 @@ pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
pmap(Fun, List, Timeout) when
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
is_function(Fun, 1),
is_list(List),
(is_integer(Timeout) andalso Timeout >= 0 orelse Timeout =:= infinity)
->
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).

View File

@ -0,0 +1,3 @@
Apply post config bridge changes in parallel.
This can greatly improve the performance when multiple bridges are being changed,
e.g. when a backup file is being imported.