refactor(emqx): ensure app shutdown and reboot order

make use of application dependency to ensure start order.
also the same dependency top-sorted for shutdown and reboot order
This commit is contained in:
Zaiming Shi 2021-08-03 00:39:06 +02:00
parent ff006abd5c
commit e6232665a3
21 changed files with 160 additions and 120 deletions

View File

@ -6,8 +6,3 @@ EMQX_ZONES__DEFAULT__LISTENERS__MQTT_WS__PROXY_PROTOCOL=true
EMQX_LOG__CONSOLE_HANDLER__ENABLE=true
EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug
EMQX_LOG__PRIMARY_LEVEL=debug
EMQX_ZONES__DEFAULT__MQTT__MAX_TOPIC_ALIAS=10
EMQX_ZONES__DEFAULT__MQTT__RETRY_INTERVAL=2s
HOCON_ENV_OVERRIDE_PREFIX=EMQX_
EMQX_LOG__PRIMARY_LEVEL=debug
EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug

View File

@ -23,7 +23,6 @@
%% Start/Stop the application
-export([ start/0
, restart/1
, is_running/1
, stop/0
]).
@ -52,12 +51,6 @@
, run_fold_hook/3
]).
%% Shutdown and reboot
-export([ shutdown/0
, shutdown/1
, reboot/0
]).
%% Troubleshooting
-export([ set_debug_secret/1
]).
@ -94,19 +87,8 @@ set_debug_secret(PathToSecretFile) ->
%% @doc Start emqx application
-spec(start() -> {ok, list(atom())} | {error, term()}).
start() ->
%% Check OS
%% Check VM
%% Check Mnesia
application:ensure_all_started(?APP).
-spec(restart(string()) -> ok).
restart(ConfFile) ->
reload_config(ConfFile),
shutdown(),
ok = application:stop(mnesia),
_ = application:start(mnesia),
reboot().
%% @doc Stop emqx application.
-spec(stop() -> ok | {error, term()}).
stop() ->
@ -202,43 +184,3 @@ run_hook(HookPoint, Args) ->
-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
run_fold_hook(HookPoint, Args, Acc) ->
emqx_hooks:run_fold(HookPoint, Args, Acc).
%%--------------------------------------------------------------------
%% Shutdown and reboot
%%--------------------------------------------------------------------
shutdown() ->
shutdown(normal).
shutdown(Reason) ->
?SLOG(critical, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_app/1, lists:reverse(default_started_applications())).
stop_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
application:stop(App).
reboot() ->
lists:foreach(fun application:start/1 , default_started_applications()).
default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, quicer, emqx] ++ emqx_feature().
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
reload_config(ConfFile) ->
{ok, [Conf]} = file:consult(ConfFile),
lists:foreach(fun({App, Vals}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Vals]
end, Conf).
-ifndef(EMQX_DEP_APPS).
emqx_feature() -> [].
-else.
emqx_feature() ->
?EMQX_DEP_APPS.
-endif.

View File

@ -53,7 +53,6 @@ start(_Type, _Args) ->
ok = maybe_start_quicer(),
{ok, Sup} = emqx_sup:start_link(),
ok = maybe_start_listeners(),
ok = start_autocluster(),
ok = emqx_alarm_handler:load(),
register(emqx, self()),
{ok, Sup}.
@ -132,9 +131,3 @@ get_release() ->
release_in_macro() ->
element(2, ?EMQX_RELEASE).
start_autocluster() ->
ekka:callback(prepare, fun emqx:shutdown/1),
ekka:callback(reboot, fun emqx:reboot/0),
_ = ekka:autocluster(?APP), %% returns 'ok' or a pid or 'any()' as in spec
ok.

View File

@ -32,25 +32,6 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_restart(_) ->
ConfFile = "test.config",
Data = "[{emqx_statsd,[{interval,15000},{push_gateway,\"http://127.0.0.1:9091\"}]}].",
file:write_file(ConfFile, list_to_binary(Data)),
emqx:restart(ConfFile),
file:delete(ConfFile).
t_stop_start(_) ->
emqx:stop(),
false = emqx:is_running(node()),
emqx:start(),
true = emqx:is_running(node()),
ok = emqx:shutdown(),
false = emqx:is_running(node()),
ok = emqx:reboot(),
true = emqx:is_running(node()),
ok = emqx:shutdown(for_test),
false = emqx:is_running(node()).
t_emqx_pubsub_api(_) ->
true = emqx:is_running(node()),
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,replayq,emqtt]},
{applications, [kernel,stdlib,replayq,emqtt,emqx]},
{mod, {emqx_bridge_mqtt_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -12,7 +12,8 @@
eredis,
epgsql,
mysql,
mongodb
mongodb,
emqx
]},
{env,[]},
{modules, []},

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]},
{applications, [kernel,stdlib,mnesia,minirest,emqx]},
{mod, {emqx_dashboard_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -5,7 +5,8 @@
{mod, {emqx_data_bridge_app, []}},
{applications,
[kernel,
stdlib
stdlib,
emqx
]},
{env,[]},
{modules, []},

View File

@ -4,7 +4,7 @@
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},
{applications, [kernel,stdlib,grpc]},
{applications, [kernel,stdlib,grpc,emqx]},
{env,[]},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},

View File

@ -3,7 +3,7 @@
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, lwm2m_coap]},
{applications, [kernel, stdlib, grpc, lwm2m_coap, emqx]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -21,6 +21,13 @@
, prep_stop/1
]).
%% Shutdown and reboot
-export([ shutdown/1
, ensure_apps_started/0
]).
-export([sorted_reboot_apps/0]).
-behaviour(application).
-include_lib("emqx/include/logger.hrl").
@ -28,18 +35,21 @@
start(_Type, _Args) ->
ok = set_backtrace_depth(),
ok = print_otp_version_warning(),
_ = load_modules(),
%% need to load some app envs
%% TODO delete it once emqx boot does not depend on modules envs
_ = load_modules(),
ok = load_config_files(),
{ok, RootSupPid} = emqx_machine_sup:start_link(),
{ok, _} = application:ensure_all_started(emqx),
ok = ensure_apps_started(),
_ = emqx_plugins:load(),
_ = start_modules(),
ok = print_vsn(),
ok = start_autocluster(),
{ok, RootSupPid}.
prep_stop(_State) ->
@ -71,13 +81,9 @@ print_vsn() ->
-ifndef(EMQX_ENTERPRISE).
load_modules() ->
application:load(emqx_modules).
start_modules() ->
application:ensure_all_started(emqx_modules).
-else.
load_modules() ->
ok.
start_modules() ->
ok.
-endif.
load_config_files() ->
@ -88,3 +94,83 @@ load_config_files() ->
ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
%% to avoid config being loaded again when emqx app starts.
ok = emqx_app:set_init_config_load_done().
start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:shutdown/1),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok.
shutdown(Reason) ->
?SLOG(critical, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
application:stop(App).
ensure_apps_started() ->
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
case application:ensure_all_started(App) of
{ok, Apps} ->
?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({faile_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
%% 1. due to static static config change
%% 2. after join a cluster
reboot_apps() ->
[gproc, esockd, ranch, cowboy, ekka, quicer, emqx | ?EMQX_DEP_APPS].
%% quicer can not be added to emqx's .app because it might be opted out at build time
implicit_deps() ->
[{emqx, [quicer]}].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps ++ implicit_deps()).
app_deps(App) ->
case application:get_key(App, applications) of
undefined -> [];
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
Sorted;
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
end.
add_app(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
add_app(G, App, []);
add_app(_G, _App, []) ->
ok;
add_app(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).

View File

@ -14,6 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This supervisor manages workers which should never need a restart
%% due to config changes or when joining a cluster.
-module(emqx_machine_sup).
-behaviour(supervisor).
@ -35,15 +37,6 @@ init([]) ->
},
{ok, {SupFlags, Children}}.
% child_supervisor(Mod) ->
% #{id => Mod,
% start => {Mod, start_link, []},
% restart => permanent,
% shutdown => infinity,
% type => supervisor,
% modules => [Mod]
% }.
child_worker(M, Args) ->
#{id => M,
start => {M, start_link, Args},

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_app_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_shutdown_reboot(_Config) ->
emqx_machine_app:shutdown(normal),
false = emqx:is_running(node()),
emqx_machine_app:ensure_apps_started(),
true = emqx:is_running(node()),
ok = emqx_machine_app:shutdown(for_test),
false = emqx:is_running(node()).

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest]},
{applications, [kernel,stdlib,minirest,emqx]},
{mod, {emqx_mgmt_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -2,7 +2,7 @@
[{description, "EMQ X Modules"},
{vsn, "5.0.0"},
{modules, []},
{applications, [kernel,stdlib]},
{applications, [kernel,stdlib,emqx]},
{mod, {emqx_modules_app, []}},
{registered, [emqx_modules_sup]},
{env, []}

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_prometheus_sup]},
{applications, [kernel,stdlib,prometheus]},
{applications, [kernel,stdlib,prometheus,emqx]},
{mod, {emqx_prometheus_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -8,7 +8,8 @@
stdlib,
gproc,
hocon,
jsx
jsx,
emqx
]},
{env,[]},
{modules, []},

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},
{applications, [kernel,stdlib,emqx]},
{mod, {emqx_retainer_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -3,7 +3,7 @@
{vsn, "5.0.0"},
{registered, []},
{applications,
[kernel,stdlib]},
[kernel,stdlib,emqx]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -6,7 +6,8 @@
{applications,
[kernel,
stdlib,
estatsd
estatsd,
emqx
]},
{env,[]},
{modules, []},

View File

@ -458,8 +458,13 @@ coveralls() ->
app_names() -> list_dir("apps") ++ list_dir("lib-ee").
list_dir(Dir) ->
{ok, Names} = file:list_dir(Dir),
[list_to_atom(Name) || Name <- Names, filelib:is_dir(filename:join([Dir, Name]))].
case filelib:is_dir(Dir) of
true ->
{ok, Names} = file:list_dir(Dir),
[list_to_atom(Name) || Name <- Names, filelib:is_dir(filename:join([Dir, Name]))];
false ->
[]
end.
%% ==== Enterprise supports below ==================================================================