Fix: apps should boots after emqx_machine_sup's children spawn. (#5851)

* fix(cluster_call): apps should start after cluster_call init

* fix: undef function

* chore: reformat code

* fix: ekka must start before emqx
This commit is contained in:
zhongwencool 2021-09-30 13:56:18 +08:00 committed by GitHub
parent 1c8656dd0b
commit be123f613d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 157 deletions

View File

@ -28,6 +28,13 @@
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
-define(CM_SHARD, emqx_cm_shard).
-define(EMQX_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
, ?RULE_ENGINE_SHARD
, ?MOD_DELAYED_SHARD
]).
%%--------------------------------------------------------------------
%% Banner
%%--------------------------------------------------------------------

View File

@ -24,6 +24,7 @@
, get_description/0
, get_release/0
, set_init_config_load_done/0
, get_init_config_load_done/0
, set_override_conf_file/1
]).
@ -33,24 +34,14 @@
-define(APP, emqx).
-define(EMQX_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
, ?RULE_ENGINE_SHARD
, ?MOD_DELAYED_SHARD
]).
%%--------------------------------------------------------------------
%% Application callbacks
%%--------------------------------------------------------------------
start(_Type, _Args) ->
ok = maybe_load_config(),
%% Load application first for ekka_mnesia scanner
ekka:start(),
ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity),
ok = maybe_start_quicer(),
start_ekka(),
{ok, Sup} = emqx_sup:start_link(),
ok = maybe_start_listeners(),
ok = emqx_alarm_handler:load(),
@ -70,15 +61,18 @@ stop(_State) -> ok.
set_init_config_load_done() ->
application:set_env(emqx, init_config_load_done, true).
get_init_config_load_done() ->
application:get_env(emqx, init_config_load_done, false).
%% @doc This API is mostly for testing.
%% The override config file is typically located in the 'data' dir when
%% it is a emqx release, but emqx app should not have to konw where the
%% it is a emqx release, but emqx app should not have to know where the
%% 'data' dir is located.
set_override_conf_file(File) ->
application:set_env(emqx, override_conf_file, File).
maybe_load_config() ->
case application:get_env(emqx, init_config_load_done, false) of
case get_init_config_load_done() of
true ->
ok;
false ->
@ -86,6 +80,11 @@ maybe_load_config() ->
ConfFiles = application:get_env(emqx, config_files, []),
emqx_config:init_load(emqx_schema, ConfFiles)
end.
%% @doc This API is mostly for testing
%% we already start ekka in emqx_machine
start_ekka() ->
ekka:start(),
ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity).
maybe_start_listeners() ->
case emqx_boot:is_enabled(listeners) of

View File

@ -21,17 +21,8 @@
, is_ready/0
]).
-export([ stop_apps/1
, ensure_apps_started/0
]).
-export([sorted_reboot_apps/0]).
-ifdef(TEST).
-export([sorted_reboot_apps/1]).
-endif.
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
%% @doc EMQ X boot entrypoint.
start() ->
@ -45,14 +36,10 @@ start() ->
ok = print_otp_version_warning(),
ok = load_config_files(),
ok = ensure_apps_started(),
_ = emqx_plugins:load(),
ok = print_vsn(),
ok = start_autocluster().
%% Load application first for ekka_mnesia scanner
ekka:start(),
ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity),
ok.
graceful_shutdown() ->
emqx_machine_terminator:graceful_wait().
@ -74,13 +61,6 @@ print_otp_version_warning() ->
[?OTP_RELEASE]).
-endif. % OTP_RELEASE > 22
-ifdef(TEST).
print_vsn() -> ok.
-else. % TEST
print_vsn() ->
?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-endif. % TEST
load_config_files() ->
%% the app env 'config_files' for 'emqx` app should be set
%% in app.time.config by boot script before starting Erlang VM
@ -89,114 +69,3 @@ load_config_files() ->
ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
%% to avoid config being loaded again when emqx app starts.
ok = emqx_app:set_init_config_load_done().
start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:stop_apps/1),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok.
stop_apps(Reason) ->
?SLOG(info, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
try
_ = application:stop(App)
catch
C : E ->
?SLOG(error, #{msg => "failed_to_stop_app",
app => App,
exception => C,
reason => E})
end.
ensure_apps_started() ->
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
case application:ensure_all_started(App) of
{ok, Apps} ->
?SLOG(debug, #{msg => "started_apps", apps => Apps});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({failed_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
%% 1. due to static static config change
%% 2. after join a cluster
reboot_apps() ->
[ gproc
, esockd
, ranch
, cowboy
, emqx
, emqx_prometheus
, emqx_modules
, emqx_dashboard
, emqx_connector
, emqx_gateway
, emqx_statsd
, emqx_resource
, emqx_rule_engine
, emqx_bridge
, emqx_bridge_mqtt
, emqx_plugin_libs
, emqx_management
, emqx_retainer
, emqx_exhook
, emqx_authn
, emqx_authz
, emqx_psk
].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps).
app_deps(App) ->
case application:get_key(App, applications) of
undefined -> [];
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
try
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
Sorted;
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
end
after
digraph:delete(G)
end.
add_app(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
add_app(G, App, []);
add_app(_G, _App, []) ->
ok;
add_app(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).

View File

@ -0,0 +1,152 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_boot).
-include_lib("emqx/include/logger.hrl").
-export([post_boot/0]).
-export([stop_apps/1, ensure_apps_started/0]).
-export([sorted_reboot_apps/0]).
-export([start_autocluster/0]).
-ifdef(TEST).
-export([sorted_reboot_apps/1]).
-endif.
post_boot() ->
ok = ensure_apps_started(),
_ = emqx_plugins:load(),
ok = print_vsn(),
ok = start_autocluster(),
ignore.
-ifdef(TEST).
print_vsn() -> ok.
-else. % TEST
print_vsn() ->
?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-endif. % TEST
start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:stop_apps/1),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok.
stop_apps(Reason) ->
?SLOG(info, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
try
_ = application:stop(App)
catch
C : E ->
?SLOG(error, #{msg => "failed_to_stop_app",
app => App,
exception => C,
reason => E})
end.
ensure_apps_started() ->
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
case application:ensure_all_started(App) of
{ok, Apps} ->
?SLOG(debug, #{msg => "started_apps", apps => Apps});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({failed_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
%% 1. due to static static config change
%% 2. after join a cluster
reboot_apps() ->
[ gproc
, esockd
, ranch
, cowboy
, emqx
, emqx_prometheus
, emqx_modules
, emqx_dashboard
, emqx_connector
, emqx_gateway
, emqx_statsd
, emqx_resource
, emqx_rule_engine
, emqx_bridge
, emqx_bridge_mqtt
, emqx_plugin_libs
, emqx_management
, emqx_retainer
, emqx_exhook
, emqx_authn
, emqx_authz
].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps).
app_deps(App) ->
case application:get_key(App, applications) of
undefined -> [];
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
try
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
Sorted;
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
end
after
digraph:delete(G)
end.
add_app(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
add_app(G, App, []);
add_app(_G, _App, []) ->
ok;
add_app(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).

View File

@ -33,7 +33,8 @@ init([]) ->
Terminator = child_worker(emqx_machine_terminator, [], transient),
ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent),
ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent),
Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler],
BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler, BootApps],
SupFlags = #{strategy => one_for_one,
intensity => 100,
period => 10
@ -41,8 +42,11 @@ init([]) ->
{ok, {SupFlags, Children}}.
child_worker(M, Args, Restart) ->
child_worker(M, start_link, Args, Restart).
child_worker(M, Func, Args, Restart) ->
#{id => M,
start => {M, start_link, Args},
start => {M, Func, Args},
restart => Restart,
shutdown => 5000,
type => worker,

View File

@ -80,7 +80,7 @@ handle_cast(_Cast, State) ->
handle_call(?DO_IT, _From, State) ->
try
emqx_machine:stop_apps(normal)
emqx_machine_boot:stop_apps(normal)
catch
C : E : St ->
Apps = [element(1, A) || A <- application:which_applications()],

View File

@ -33,9 +33,9 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_shutdown_reboot(_Config) ->
emqx_machine:stop_apps(normal),
emqx_machine_boot:stop_apps(normal),
false = emqx:is_running(node()),
emqx_machine:ensure_apps_started(),
emqx_machine_boot:ensure_apps_started(),
true = emqx:is_running(node()),
ok = emqx_machine:stop_apps(for_test),
ok = emqx_machine_boot:stop_apps(for_test),
false = emqx:is_running(node()).

View File

@ -38,7 +38,7 @@ sorted_reboot_apps_cycle_test() ->
check_order(Apps) ->
AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])),
Sorted = emqx_machine:sorted_reboot_apps(Apps),
Sorted = emqx_machine_boot:sorted_reboot_apps(Apps),
case length(AllApps) =:= length(Sorted) of
true -> ok;
false -> error({AllApps, Sorted})