Merge pull request #5414 from zmstone/fix-graceful-shutdown

feat: add graceful shutdown
This commit is contained in:
Zaiming (Stone) Shi 2021-08-06 13:53:18 +02:00 committed by GitHub
commit e162b42b40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 449 additions and 159 deletions

View File

@ -138,7 +138,7 @@ EOF
exit 1
fi
IDLE_TIME=0
while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do
while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do
if [ $IDLE_TIME -gt 10 ]
then
echo "emqx running error"

View File

@ -0,0 +1,187 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine).
-export([ start/0
, graceful_shutdown/0
, is_ready/0
]).
-export([ stop_apps/1
, ensure_apps_started/0
]).
-export([sorted_reboot_apps/0]).
-ifdef(TEST).
-export([sorted_reboot_apps/1]).
-endif.
-include_lib("emqx/include/logger.hrl").
%% @doc EMQ X boot entrypoint.
start() ->
os:set_signal(sighup, ignore),
os:set_signal(sigterm, handle), %% default is handle
ok = set_backtrace_depth(),
ok = print_otp_version_warning(),
%% need to load some app envs
%% TODO delete it once emqx boot does not depend on modules envs
_ = load_modules(),
ok = load_config_files(),
ok = ensure_apps_started(),
_ = emqx_plugins:load(),
ok = print_vsn(),
ok = start_autocluster().
graceful_shutdown() ->
emqx_machine_terminator:graceful_wait().
set_backtrace_depth() ->
{ok, Depth} = application:get_env(emqx_machine, backtrace_depth),
_ = erlang:system_flag(backtrace_depth, Depth),
ok.
%% @doc Return true if boot is complete.
is_ready() ->
emqx_machine_terminator:is_running().
-if(?OTP_RELEASE > 22).
print_otp_version_warning() -> ok.
-else.
print_otp_version_warning() ->
?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n",
[?OTP_RELEASE]).
-endif. % OTP_RELEASE > 22
-ifdef(TEST).
print_vsn() -> ok.
-else. % TEST
print_vsn() ->
?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-endif. % TEST
-ifndef(EMQX_ENTERPRISE).
load_modules() ->
application:load(emqx_modules).
-else.
load_modules() ->
ok.
-endif.
load_config_files() ->
%% the app env 'config_files' for 'emqx` app should be set
%% in app.time.config by boot script before starting Erlang VM
ConfFiles = application:get_env(emqx, config_files, []),
%% emqx_machine_schema is a superset of emqx_schema
ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
%% to avoid config being loaded again when emqx app starts.
ok = emqx_app:set_init_config_load_done().
start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:stop_apps/1),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok.
stop_apps(Reason) ->
?SLOG(info, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
try
_ = application:stop(App)
catch
C : E ->
?SLOG(error, #{msg => "failed_to_stop_app",
app => App,
exception => C,
reason => E})
end.
ensure_apps_started() ->
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
case application:ensure_all_started(App) of
{ok, Apps} ->
?SLOG(debug, #{msg => "started_apps", apps => Apps});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({faile_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
%% 1. due to static static config change
%% 2. after join a cluster
reboot_apps() ->
[gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps).
app_deps(App) ->
case application:get_key(App, applications) of
undefined -> [];
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
try
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
Sorted;
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
end
after
digraph:delete(G)
end.
add_app(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
add_app(G, App, []);
add_app(_G, _App, []) ->
ok;
add_app(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).

View File

@ -18,155 +18,13 @@
-export([ start/2
, stop/1
, prep_stop/1
]).
%% Shutdown and reboot
-export([ shutdown/1
, ensure_apps_started/0
]).
-export([sorted_reboot_apps/0]).
-behaviour(application).
-include_lib("emqx/include/logger.hrl").
start(_Type, _Args) ->
ok = set_backtrace_depth(),
ok = print_otp_version_warning(),
%% need to load some app envs
%% TODO delete it once emqx boot does not depend on modules envs
_ = load_modules(),
ok = load_config_files(),
{ok, RootSupPid} = emqx_machine_sup:start_link(),
ok = ensure_apps_started(),
_ = emqx_plugins:load(),
ok = print_vsn(),
ok = start_autocluster(),
{ok, RootSupPid}.
prep_stop(_State) ->
application:stop(emqx).
ok = emqx_machine:start(),
emqx_machine_sup:start_link().
stop(_State) ->
ok.
set_backtrace_depth() ->
{ok, Depth} = application:get_env(emqx_machine, backtrace_depth),
_ = erlang:system_flag(backtrace_depth, Depth),
ok.
-if(?OTP_RELEASE > 22).
print_otp_version_warning() -> ok.
-else.
print_otp_version_warning() ->
?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n",
[?OTP_RELEASE]).
-endif. % OTP_RELEASE > 22
-ifdef(TEST).
print_vsn() -> ok.
-else. % TEST
print_vsn() ->
?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-endif. % TEST
-ifndef(EMQX_ENTERPRISE).
load_modules() ->
application:load(emqx_modules).
-else.
load_modules() ->
ok.
-endif.
load_config_files() ->
%% the app env 'config_files' for 'emqx` app should be set
%% in app.time.config by boot script before starting Erlang VM
ConfFiles = application:get_env(emqx, config_files, []),
%% emqx_machine_schema is a superset of emqx_schema
ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
%% to avoid config being loaded again when emqx app starts.
ok = emqx_app:set_init_config_load_done().
start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:shutdown/1),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok.
shutdown(Reason) ->
?SLOG(critical, #{msg => "stopping_apps", reason => Reason}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
application:stop(App).
ensure_apps_started() ->
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
case application:ensure_all_started(App) of
{ok, Apps} ->
?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({faile_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
%% 1. due to static static config change
%% 2. after join a cluster
reboot_apps() ->
[gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps).
app_deps(App) ->
case application:get_key(App, applications) of
undefined -> [];
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
Sorted;
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
end.
add_app(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
add_app(G, App, []);
add_app(_G, _App, []) ->
ok;
add_app(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).

View File

@ -0,0 +1,63 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% This module implements a gen_event handler which
%% swap-in replaces the default one from OTP.
%% The kill signal (sigterm) is captured so we can
%% perform graceful shutdown.
-module(emqx_machine_signal_handler).
-export([start/0, init/1, format_status/2,
handle_event/2, handle_call/2, handle_info/2,
terminate/2, code_change/3]).
-include_lib("emqx/include/logger.hrl").
start() ->
ok = gen_event:swap_sup_handler(
erl_signal_server,
{erl_signal_handler, []},
{?MODULE, []}).
init({[], _}) -> {ok, #{}}.
handle_event(sigterm, State) ->
?ULOG("Received terminate signal, shutting down now~n", []),
emqx_machine_terminator:graceful(),
{ok, State};
handle_event(Event, State) ->
%% delegate other events back to erl_signal_handler
%% erl_signal_handler does not make use of the State
%% so we can pass whatever from here
_ = erl_signal_handler:handle_event(Event, State),
{ok, State}.
handle_info(stop, State) ->
{ok, State};
handle_info(_Other, State) ->
{ok, State}.
handle_call(_Request, State) ->
{ok, ok, State}.
format_status(_Opt, [_Pdict,_S]) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Args, _State) ->
ok.

View File

@ -29,18 +29,19 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
GlobalGC = child_worker(emqx_global_gc, []),
Children = [GlobalGC],
SupFlags = #{strategy => one_for_all,
GlobalGC = child_worker(emqx_global_gc, [], permanent),
Terminator = child_worker(emqx_machine_terminator, [], transient),
Children = [GlobalGC, Terminator],
SupFlags = #{strategy => one_for_one,
intensity => 100,
period => 10
},
{ok, {SupFlags, Children}}.
child_worker(M, Args) ->
child_worker(M, Args, Restart) ->
#{id => M,
start => {M, start_link, Args},
restart => permanent,
restart => Restart,
shutdown => 5000,
type => worker,
modules => [M]

View File

@ -0,0 +1,115 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_terminator).
-behaviour(gen_server).
-export([ start_link/0
, graceful/0
, graceful_wait/0
, is_running/0
]).
-export([init/1, format_status/2,
handle_cast/2, handle_call/3, handle_info/2,
terminate/2, code_change/3]).
-include_lib("emqx/include/logger.hrl").
-define(TERMINATOR, ?MODULE).
-define(DO_IT, graceful_shutdown).
%% @doc This API is called to shutdown the Erlang VM by RPC call from remote shell node.
%% The shutown of apps is delegated to a to a process instead of doing it in the RPC spawned
%% process which has a remote group leader.
start_link() ->
{ok, _} = gen_server:start_link({local, ?TERMINATOR}, ?MODULE, [], []).
is_running() -> is_pid(whereis(?TERMINATOR)).
%% @doc Send a signal to activate the terminator.
graceful() ->
try
_ = gen_server:call(?TERMINATOR, ?DO_IT, infinity)
catch
_ : _ ->
%% failed to notify terminator, probably due to not started yet
%% or node is going down, either case, the caller
%% should issue a shutdown to be sure
%% NOTE: not exit_loop here because we do not want to
%% block erl_signal_server
init:stop()
end,
ok.
%% @doc Shutdown the Erlang VM and wait until the terminator dies or the VM dies.
graceful_wait() ->
case whereis(?TERMINATOR) of
undefined ->
?SLOG(warning, #{msg => "shutdown_before_boot_is_complete"}),
exit_loop();
Pid ->
ok = graceful(),
Ref = monitor(process, Pid),
%% NOTE: not exactly sure, but maybe there is a chance that
%% Erlang VM goes down before this receive.
%% In which case, the remote caller will get {badrpc, nodedown}
receive {'DOWN', Ref, process, Pid, _} -> ok end
end.
exit_loop() ->
init:stop(),
timer:sleep(100),
exit_loop().
init(_) ->
ok = emqx_machine_signal_handler:start(),
{ok, #{}}.
handle_info(_, State) ->
{noreply, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
handle_call(?DO_IT, _From, State) ->
try
emqx_machine:stop_apps(normal)
catch
C : E : St ->
Apps = [element(1, A) || A <- application:which_applications()],
?SLOG(error, #{msg => "failed_to_stop_apps",
exception => C,
reason => E,
stacktrace => St,
remaining_apps => Apps
})
after
init:stop()
end,
{reply, ok, State};
handle_call(_Call, _From, State) ->
{noreply, State}.
format_status(_Opt, [_Pdict,_S]) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Args, _State) ->
ok.

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_app_SUITE).
-module(emqx_machine_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -33,9 +33,9 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_shutdown_reboot(_Config) ->
emqx_machine_app:shutdown(normal),
emqx_machine:stop_apps(normal),
false = emqx:is_running(node()),
emqx_machine_app:ensure_apps_started(),
emqx_machine:ensure_apps_started(),
true = emqx:is_running(node()),
ok = emqx_machine_app:shutdown(for_test),
ok = emqx_machine:stop_apps(for_test),
false = emqx:is_running(node()).

View File

@ -0,0 +1,60 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_tests).
-include_lib("eunit/include/eunit.hrl").
sorted_reboot_apps_test_() ->
Apps1 = [{1, [2, 3, 4]},
{2, [3, 4]}
],
Apps2 = [{1, [2, 3, 4]},
{2, [3, 4]},
{5, [4, 3, 2, 1, 1]}
],
[fun() -> check_order(Apps1) end,
fun() -> check_order(Apps2) end
].
sorted_reboot_apps_cycle_test() ->
Apps = [{1,[2]},{2, [1,3]}],
?assertError({circular_application_dependency, [[1, 2, 1], [2, 1, 2]]},
check_order(Apps)).
check_order(Apps) ->
AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])),
Sorted = emqx_machine:sorted_reboot_apps(Apps),
case length(AllApps) =:= length(Sorted) of
true -> ok;
false -> error({AllApps, Sorted})
end,
{_, SortedWithIndex} =
lists:foldr(fun(A, {I, Acc}) -> {I + 1, [{A, I} | Acc]} end, {1, []}, Sorted),
do_check_order(Apps, SortedWithIndex).
do_check_order([], _) -> ok;
do_check_order([{A, Deps} | Rest], Sorted) ->
case lists:filter(fun(Dep) -> is_sorted_before(Dep, A, Sorted) end, Deps) of
[] -> do_check_order(Rest, Sorted);
Bad -> throw({A, Bad})
end.
is_sorted_before(A, B, Sorted) ->
{A, IndexA} = lists:keyfind(A, 1, Sorted),
{B, IndexB} = lists:keyfind(B, 1, Sorted),
IndexA < IndexB.

View File

@ -99,7 +99,7 @@ relx_usage() {
echo " don't make it permanent"
;;
*)
echo "Usage: $REL_NAME {start|start_boot <file>|ertspath|foreground|stop|restart|reboot|pid|ping|console|console_clean|console_boot <file>|attach|remote_console|upgrade|downgrade|install|uninstall|versions|escript|ctl|rpc|rpcterms|eval|root_dir}"
echo "Usage: $REL_NAME {start|start_boot <file>|ertspath|foreground|stop|pid|ping|console|console_clean|console_boot <file>|attach|remote_console|upgrade|downgrade|install|uninstall|versions|escript|ctl|rpc|rpcterms|eval|root_dir}"
;;
esac
}

View File

@ -72,9 +72,15 @@ do(Args) ->
%% a "pong"
io:format("pong\n");
["stop"] ->
io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]);
["restart", "-config", ConfigFile | _RestArgs1] ->
io:format("~p\n", [rpc:call(TargetNode, emqx, restart, [ConfigFile], 60000)]);
case rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], 60000) of
ok ->
ok;
{badrpc, nodedown} ->
%% nodetool commands are always executed after a ping
%% which if the code gets here, it's because the target node
%% has shutdown before RPC returns.
ok
end;
["rpc", Module, Function | RpcArgs] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
[RpcArgs], 60000) of
@ -141,7 +147,7 @@ do(Args) ->
end;
Other ->
io:format("Other: ~p\n", [Other]),
io:format("Usage: nodetool {genconfig, chkconfig|getpid|ping|stop|restart|reboot|rpc|rpc_infinity|rpcterms|eval [Terms]} [RPC]\n")
io:format("Usage: nodetool {genconfig, chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval [Terms]} [RPC]\n")
end,
net_kernel:stop().