From bc23ff5e4709eb69812e5b79c6f3649a34b5ce7c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 4 Aug 2021 21:40:25 +0200 Subject: [PATCH 01/11] feat: add graceful shutdown prior to this cahnge emqx node shutdown is done by init:stop which might have undesired stop order of the applications in this change, emqx_machine_terminator is added to stop apps in defined order and then terminate the node in infinite loop --- .ci/build_packages/tests.sh | 2 +- apps/emqx_machine/src/emqx_machine.erl | 27 ++++++++ apps/emqx_machine/src/emqx_machine_app.erl | 14 ++-- .../src/emqx_machine_terminator.erl | 67 +++++++++++++++++++ .../test/emqx_machine_app_SUITE.erl | 4 +- bin/emqx | 2 +- bin/nodetool | 14 ++-- 7 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 apps/emqx_machine/src/emqx_machine.erl create mode 100644 apps/emqx_machine/src/emqx_machine_terminator.erl diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index 37998395d..b5ba48f2c 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -138,7 +138,7 @@ EOF exit 1 fi IDLE_TIME=0 - while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do + while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do if [ $IDLE_TIME -gt 10 ] then echo "emqx running error" diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl new file mode 100644 index 000000000..d37d5cd4f --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -0,0 +1,27 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine). + +-export([start/0, + graceful_shutdown/0 + ]). + +start() -> + ok = emqx_machine_terminator:start(). + +graceful_shutdown() -> + emqx_machine_terminator:graceful(). diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index bc8c086c7..cf01af7a1 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -18,11 +18,9 @@ -export([ start/2 , stop/1 - , prep_stop/1 ]). -%% Shutdown and reboot --export([ shutdown/1 +-export([ stop_apps/1 , ensure_apps_started/0 ]). @@ -50,11 +48,9 @@ start(_Type, _Args) -> ok = print_vsn(), ok = start_autocluster(), + ok = emqx_machine:start(), {ok, RootSupPid}. -prep_stop(_State) -> - application:stop(emqx). - stop(_State) -> ok. @@ -96,13 +92,13 @@ load_config_files() -> ok = emqx_app:set_init_config_load_done(). start_autocluster() -> - ekka:callback(prepare, fun ?MODULE:shutdown/1), + ekka:callback(prepare, fun ?MODULE:stop_apps/1), ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec ok. -shutdown(Reason) -> - ?SLOG(critical, #{msg => "stopping_apps", reason => Reason}), +stop_apps(Reason) -> + ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), _ = emqx_alarm_handler:unload(), lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl new file mode 100644 index 000000000..7ed5f4d99 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_terminator). + +-export([ start/0 + , graceful/0 + , terminator_loop/0 + ]). + +-define(TERMINATOR, ?MODULE). + +%% @doc This API is called to shutdown the Erlang VM by RPC call from remote shell node. +%% The shutown of apps is delegated to a to a process instead of doing it in the RPC spawned +%% process which has a remote group leader. +start() -> + _ = spawn_link( + fun() -> + register(?TERMINATOR, self()), + terminator_loop() + end), + ok. + +%% internal use +terminator_loop() -> + receive + graceful_shutdown -> + ok = emqx_machine_app:stop_apps(normal), + exit_loop() + after + 1000 -> + %% keep looping for beam reload + ?MODULE:terminator_loop() + end. + +%% @doc Shutdown the Erlang VM. +graceful() -> + case whereis(?TERMINATOR) of + undefined -> + exit(emqx_machine_not_started); + Pid -> + Pid ! graceful_shutdown, + Ref = monitor(process, Pid), + %% NOTE: not exactly sure, but maybe there is a chance that + %% Erlang VM goes down before this receive. + %% In which case, the remote caller will get {badrpc, nodedown} + receive {'DOWN', Ref, process, Pid, _} -> ok end + end. + +%% Loop until Erlang VM exits +exit_loop() -> + init:stop(), + timer:sleep(100), + exit_loop(). diff --git a/apps/emqx_machine/test/emqx_machine_app_SUITE.erl b/apps/emqx_machine/test/emqx_machine_app_SUITE.erl index c1d666e53..e292af0ed 100644 --- a/apps/emqx_machine/test/emqx_machine_app_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_app_SUITE.erl @@ -33,9 +33,9 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_shutdown_reboot(_Config) -> - emqx_machine_app:shutdown(normal), + emqx_machine_app:stop_apps(normal), false = emqx:is_running(node()), emqx_machine_app:ensure_apps_started(), true = emqx:is_running(node()), - ok = emqx_machine_app:shutdown(for_test), + ok = emqx_machine_app:stop_apps(for_test), false = emqx:is_running(node()). diff --git a/bin/emqx b/bin/emqx index 0038cd2c7..0afa81bc5 100755 --- a/bin/emqx +++ b/bin/emqx @@ -99,7 +99,7 @@ relx_usage() { echo " don't make it permanent" ;; *) - echo "Usage: $REL_NAME {start|start_boot |ertspath|foreground|stop|restart|reboot|pid|ping|console|console_clean|console_boot |attach|remote_console|upgrade|downgrade|install|uninstall|versions|escript|ctl|rpc|rpcterms|eval|root_dir}" + echo "Usage: $REL_NAME {start|start_boot |ertspath|foreground|stop|pid|ping|console|console_clean|console_boot |attach|remote_console|upgrade|downgrade|install|uninstall|versions|escript|ctl|rpc|rpcterms|eval|root_dir}" ;; esac } diff --git a/bin/nodetool b/bin/nodetool index 373fdf97b..377ade040 100755 --- a/bin/nodetool +++ b/bin/nodetool @@ -72,9 +72,15 @@ do(Args) -> %% a "pong" io:format("pong\n"); ["stop"] -> - io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]); - ["restart", "-config", ConfigFile | _RestArgs1] -> - io:format("~p\n", [rpc:call(TargetNode, emqx, restart, [ConfigFile], 60000)]); + case rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], 60000) of + ok -> + ok; + {badrpc, nodedown} -> + %% nodetool commands are always executed after a ping + %% which if the code gets here, it's because the target node + %% has shutdown before RPC returns. + ok + end; ["rpc", Module, Function | RpcArgs] -> case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], 60000) of @@ -141,7 +147,7 @@ do(Args) -> end; Other -> io:format("Other: ~p\n", [Other]), - io:format("Usage: nodetool {genconfig, chkconfig|getpid|ping|stop|restart|reboot|rpc|rpc_infinity|rpcterms|eval [Terms]} [RPC]\n") + io:format("Usage: nodetool {genconfig, chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval [Terms]} [RPC]\n") end, net_kernel:stop(). From 4025d31295ea762b30ce860674012c2fca7f027d Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 5 Aug 2021 14:27:48 +0200 Subject: [PATCH 02/11] refactor(emqx_machine): move code from _app module to to emqx_machine --- apps/emqx_machine/src/emqx_machine.erl | 138 +++++++++++++++++ apps/emqx_machine/src/emqx_machine_app.erl | 140 +----------------- .../src/emqx_machine_terminator.erl | 2 +- ...e_app_SUITE.erl => emqx_machine_SUITE.erl} | 8 +- 4 files changed, 144 insertions(+), 144 deletions(-) rename apps/emqx_machine/test/{emqx_machine_app_SUITE.erl => emqx_machine_SUITE.erl} (88%) diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index d37d5cd4f..232ee4a76 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -20,8 +20,146 @@ graceful_shutdown/0 ]). +-export([ stop_apps/1 + , ensure_apps_started/0 + ]). + +-export([sorted_reboot_apps/0]). + +-include_lib("emqx/include/logger.hrl"). + +%% @doc EMQ X boot entrypoint. start() -> + ok = set_backtrace_depth(), + ok = print_otp_version_warning(), + + %% need to load some app envs + %% TODO delete it once emqx boot does not depend on modules envs + _ = load_modules(), + ok = load_config_files(), + + + ok = ensure_apps_started(), + + _ = emqx_plugins:load(), + + ok = print_vsn(), + + ok = start_autocluster(), ok = emqx_machine_terminator:start(). graceful_shutdown() -> emqx_machine_terminator:graceful(). + +set_backtrace_depth() -> + {ok, Depth} = application:get_env(emqx_machine, backtrace_depth), + _ = erlang:system_flag(backtrace_depth, Depth), + ok. + +-if(?OTP_RELEASE > 22). +print_otp_version_warning() -> ok. +-else. +print_otp_version_warning() -> + ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", + [?OTP_RELEASE]). +-endif. % OTP_RELEASE > 22 + +-ifdef(TEST). +print_vsn() -> ok. +-else. % TEST +print_vsn() -> + ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). +-endif. % TEST + +-ifndef(EMQX_ENTERPRISE). +load_modules() -> + application:load(emqx_modules). +-else. +load_modules() -> + ok. +-endif. + +load_config_files() -> + %% the app env 'config_files' for 'emqx` app should be set + %% in app.time.config by boot script before starting Erlang VM + ConfFiles = application:get_env(emqx, config_files, []), + %% emqx_machine_schema is a superset of emqx_schema + ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), + %% to avoid config being loaded again when emqx app starts. + ok = emqx_app:set_init_config_load_done(). + +start_autocluster() -> + ekka:callback(prepare, fun ?MODULE:stop_apps/1), + ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), + _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec + ok. + +stop_apps(Reason) -> + ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), + _ = emqx_alarm_handler:unload(), + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). + +stop_one_app(App) -> + ?SLOG(debug, #{msg => "stopping_app", app => App}), + application:stop(App). + +ensure_apps_started() -> + lists:foreach(fun start_one_app/1, sorted_reboot_apps()). + +start_one_app(App) -> + ?SLOG(debug, #{msg => "starting_app", app => App}), + case application:ensure_all_started(App) of + {ok, Apps} -> + ?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]}); + {error, Reason} -> + ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), + error({faile_to_start_app, App, Reason}) + end. + +%% list of app names which should be rebooted when: +%% 1. due to static static config change +%% 2. after join a cluster +reboot_apps() -> + [gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS]. + +sorted_reboot_apps() -> + Apps = [{App, app_deps(App)} || App <- reboot_apps()], + sorted_reboot_apps(Apps). + +app_deps(App) -> + case application:get_key(App, applications) of + undefined -> []; + {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) + end. + +sorted_reboot_apps(Apps) -> + G = digraph:new(), + lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), + case digraph_utils:topsort(G) of + Sorted when is_list(Sorted) -> + Sorted; + false -> + Loops = find_loops(G), + error({circular_application_dependency, Loops}) + end. + +add_app(G, App, undefined) -> + ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), + %% not loaded + add_app(G, App, []); +add_app(_G, _App, []) -> + ok; +add_app(G, App, [Dep | Deps]) -> + digraph:add_vertex(G, App), + digraph:add_vertex(G, Dep), + digraph:add_edge(G, Dep, App), %% dep -> app as dependency + add_app(G, App, Deps). + +find_loops(G) -> + lists:filtermap( + fun (App) -> + case digraph:get_short_cycle(G, App) of + false -> false; + Apps -> {true, Apps} + end + end, digraph:vertices(G)). diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index cf01af7a1..9a9b13d8f 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -20,149 +20,11 @@ , stop/1 ]). --export([ stop_apps/1 - , ensure_apps_started/0 - ]). - --export([sorted_reboot_apps/0]). - -behaviour(application). --include_lib("emqx/include/logger.hrl"). - start(_Type, _Args) -> - ok = set_backtrace_depth(), - ok = print_otp_version_warning(), - - %% need to load some app envs - %% TODO delete it once emqx boot does not depend on modules envs - _ = load_modules(), - ok = load_config_files(), - - {ok, RootSupPid} = emqx_machine_sup:start_link(), - - ok = ensure_apps_started(), - - _ = emqx_plugins:load(), - - ok = print_vsn(), - - ok = start_autocluster(), ok = emqx_machine:start(), - {ok, RootSupPid}. + emqx_machine_sup:start_link(). stop(_State) -> ok. - -set_backtrace_depth() -> - {ok, Depth} = application:get_env(emqx_machine, backtrace_depth), - _ = erlang:system_flag(backtrace_depth, Depth), - ok. - --if(?OTP_RELEASE > 22). -print_otp_version_warning() -> ok. --else. -print_otp_version_warning() -> - ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", - [?OTP_RELEASE]). --endif. % OTP_RELEASE > 22 - --ifdef(TEST). -print_vsn() -> ok. --else. % TEST -print_vsn() -> - ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). --endif. % TEST - --ifndef(EMQX_ENTERPRISE). -load_modules() -> - application:load(emqx_modules). --else. -load_modules() -> - ok. --endif. - -load_config_files() -> - %% the app env 'config_files' for 'emqx` app should be set - %% in app.time.config by boot script before starting Erlang VM - ConfFiles = application:get_env(emqx, config_files, []), - %% emqx_machine_schema is a superset of emqx_schema - ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), - %% to avoid config being loaded again when emqx app starts. - ok = emqx_app:set_init_config_load_done(). - -start_autocluster() -> - ekka:callback(prepare, fun ?MODULE:stop_apps/1), - ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), - _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec - ok. - -stop_apps(Reason) -> - ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), - _ = emqx_alarm_handler:unload(), - lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). - -stop_one_app(App) -> - ?SLOG(debug, #{msg => "stopping_app", app => App}), - application:stop(App). - -ensure_apps_started() -> - lists:foreach(fun start_one_app/1, sorted_reboot_apps()). - -start_one_app(App) -> - ?SLOG(debug, #{msg => "starting_app", app => App}), - case application:ensure_all_started(App) of - {ok, Apps} -> - ?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]}); - {error, Reason} -> - ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), - error({faile_to_start_app, App, Reason}) - end. - -%% list of app names which should be rebooted when: -%% 1. due to static static config change -%% 2. after join a cluster -reboot_apps() -> - [gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS]. - -sorted_reboot_apps() -> - Apps = [{App, app_deps(App)} || App <- reboot_apps()], - sorted_reboot_apps(Apps). - -app_deps(App) -> - case application:get_key(App, applications) of - undefined -> []; - {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) - end. - -sorted_reboot_apps(Apps) -> - G = digraph:new(), - lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), - case digraph_utils:topsort(G) of - Sorted when is_list(Sorted) -> - Sorted; - false -> - Loops = find_loops(G), - error({circular_application_dependency, Loops}) - end. - -add_app(G, App, undefined) -> - ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), - %% not loaded - add_app(G, App, []); -add_app(_G, _App, []) -> - ok; -add_app(G, App, [Dep | Deps]) -> - digraph:add_vertex(G, App), - digraph:add_vertex(G, Dep), - digraph:add_edge(G, Dep, App), %% dep -> app as dependency - add_app(G, App, Deps). - -find_loops(G) -> - lists:filtermap( - fun (App) -> - case digraph:get_short_cycle(G, App) of - false -> false; - Apps -> {true, Apps} - end - end, digraph:vertices(G)). diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 7ed5f4d99..35faf84f1 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -38,7 +38,7 @@ start() -> terminator_loop() -> receive graceful_shutdown -> - ok = emqx_machine_app:stop_apps(normal), + ok = emqx_machine:stop_apps(normal), exit_loop() after 1000 -> diff --git a/apps/emqx_machine/test/emqx_machine_app_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl similarity index 88% rename from apps/emqx_machine/test/emqx_machine_app_SUITE.erl rename to apps/emqx_machine/test/emqx_machine_SUITE.erl index e292af0ed..51cf4f8b4 100644 --- a/apps/emqx_machine/test/emqx_machine_app_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_machine_app_SUITE). +-module(emqx_machine_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -33,9 +33,9 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_shutdown_reboot(_Config) -> - emqx_machine_app:stop_apps(normal), + emqx_machine:stop_apps(normal), false = emqx:is_running(node()), - emqx_machine_app:ensure_apps_started(), + emqx_machine:ensure_apps_started(), true = emqx:is_running(node()), - ok = emqx_machine_app:stop_apps(for_test), + ok = emqx_machine:stop_apps(for_test), false = emqx:is_running(node()). From bee8f01ee8fa135e893b0cd9c380880b1959e6d9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 01:59:20 +0200 Subject: [PATCH 03/11] feat(emqx_machine): add a kill signal handler Now the signal from kill PID can also be handled gracefully --- apps/emqx_machine/src/emqx_machine.erl | 2 +- .../src/emqx_machine_signal_handler.erl | 61 +++++++++++++++++ .../src/emqx_machine_terminator.erl | 65 ++++++++++++------- 3 files changed, 103 insertions(+), 25 deletions(-) create mode 100644 apps/emqx_machine/src/emqx_machine_signal_handler.erl diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 232ee4a76..0c077816c 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -49,7 +49,7 @@ start() -> ok = emqx_machine_terminator:start(). graceful_shutdown() -> - emqx_machine_terminator:graceful(). + emqx_machine_terminator:graceful_wait(). set_backtrace_depth() -> {ok, Depth} = application:get_env(emqx_machine, backtrace_depth), diff --git a/apps/emqx_machine/src/emqx_machine_signal_handler.erl b/apps/emqx_machine/src/emqx_machine_signal_handler.erl new file mode 100644 index 000000000..c75261f51 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_signal_handler.erl @@ -0,0 +1,61 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% This module implements a gen_event handler which +%% swap-in replaces the default one from OTP. +%% The kill signal (sigterm) is captured so we can +%% perform graceful shutdown. +-module(emqx_machine_signal_handler). + +-export([start/0, init/1, format_status/2, + handle_event/2, handle_call/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("emqx/include/logger.hrl"). + +start() -> + ok = gen_event:swap_sup_handler( + erl_signal_server, + {erl_signal_handler, []}, + {?MODULE, []}). + +init({[], _}) -> {ok, #{}}. + +handle_event(sigterm, State) -> + ?ULOG("Received terminate signal, shutting down now~n", []), + emqx_machine_terminator:graceful(), + {ok, State}; +handle_event(Event, State) -> + %% delegate other events back to erl_signal_handler + %% erl_signal_handler does not make use of the State + %% so we can pass whatever from here + erl_signal_handler:handle_event(Event, State), + {ok, State}. + +handle_info(stop, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +format_status(_Opt, [_Pdict,_S]) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Args, _State) -> + ok. diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 35faf84f1..fdd75d3ad 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -16,43 +16,40 @@ -module(emqx_machine_terminator). +-behaviour(gen_server). + -export([ start/0 , graceful/0 - , terminator_loop/0 + , graceful_wait/0 ]). +-export([init/1, format_status/2, + handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3]). + -define(TERMINATOR, ?MODULE). +-define(DO_IT, graceful_shutdown). %% @doc This API is called to shutdown the Erlang VM by RPC call from remote shell node. %% The shutown of apps is delegated to a to a process instead of doing it in the RPC spawned %% process which has a remote group leader. start() -> - _ = spawn_link( - fun() -> - register(?TERMINATOR, self()), - terminator_loop() - end), + {ok, _} = gen_server:start_link({local, ?TERMINATOR}, ?MODULE, [], []), + %% NOTE: Do not link this process under any supervision tree ok. -%% internal use -terminator_loop() -> - receive - graceful_shutdown -> - ok = emqx_machine:stop_apps(normal), - exit_loop() - after - 1000 -> - %% keep looping for beam reload - ?MODULE:terminator_loop() - end. - -%% @doc Shutdown the Erlang VM. +%% @doc Send a signal to activate the terminator. graceful() -> + ?TERMINATOR ! ?DO_IT, + ok. + +%% @doc Shutdown the Erlang VM and wait until the terminator dies or the VM dies. +graceful_wait() -> case whereis(?TERMINATOR) of undefined -> exit(emqx_machine_not_started); Pid -> - Pid ! graceful_shutdown, + ok = graceful(), Ref = monitor(process, Pid), %% NOTE: not exactly sure, but maybe there is a chance that %% Erlang VM goes down before this receive. @@ -60,8 +57,28 @@ graceful() -> receive {'DOWN', Ref, process, Pid, _} -> ok end end. -%% Loop until Erlang VM exits -exit_loop() -> +init(_) -> + ok = emqx_machine_signal_handler:start(), + {ok, #{}}. + +handle_info(?DO_IT, State) -> + ok = emqx_machine:stop_apps(normal), init:stop(), - timer:sleep(100), - exit_loop(). + {noreply, State}; +handle_info(_, State) -> + {noreply, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_call(_Call, _From, State) -> + {noreply, State}. + +format_status(_Opt, [_Pdict,_S]) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Args, _State) -> + ok. From 85f8ba10ce9ad2c87dc212a9c1aba8db537cc07c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 08:21:06 +0200 Subject: [PATCH 04/11] fix(emqx_machine_signal_handler): ignore unknown info --- apps/emqx_machine/src/emqx_machine_signal_handler.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_machine/src/emqx_machine_signal_handler.erl b/apps/emqx_machine/src/emqx_machine_signal_handler.erl index c75261f51..d8cc545d7 100644 --- a/apps/emqx_machine/src/emqx_machine_signal_handler.erl +++ b/apps/emqx_machine/src/emqx_machine_signal_handler.erl @@ -42,10 +42,12 @@ handle_event(Event, State) -> %% delegate other events back to erl_signal_handler %% erl_signal_handler does not make use of the State %% so we can pass whatever from here - erl_signal_handler:handle_event(Event, State), + _ = erl_signal_handler:handle_event(Event, State), {ok, State}. handle_info(stop, State) -> + {ok, State}; +handle_info(_Other, State) -> {ok, State}. handle_call(_Request, State) -> From 044e0846983a2cb430699431fe631795a3fdd361 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 08:21:38 +0200 Subject: [PATCH 05/11] feat(emqx_machine): ignore sighup, ensure sigterm --- apps/emqx_machine/src/emqx_machine.erl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 0c077816c..69a0fc05e 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -30,6 +30,8 @@ %% @doc EMQ X boot entrypoint. start() -> + os:set_signal(sighup, ignore), + os:set_signal(sigterm, handle), %% default is handle ok = set_backtrace_depth(), ok = print_otp_version_warning(), @@ -101,7 +103,16 @@ stop_apps(Reason) -> stop_one_app(App) -> ?SLOG(debug, #{msg => "stopping_app", app => App}), - application:stop(App). + try + _ = application:stop(App) + catch + C : E -> + ?SLOG(error, #{msg => "failed_to_stop_app", + app => App, + exception => C, + reason => E}) + end. + ensure_apps_started() -> lists:foreach(fun start_one_app/1, sorted_reboot_apps()). @@ -110,7 +121,7 @@ start_one_app(App) -> ?SLOG(debug, #{msg => "starting_app", app => App}), case application:ensure_all_started(App) of {ok, Apps} -> - ?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]}); + ?SLOG(debug, #{msg => "started_apps", apps => Apps}); {error, Reason} -> ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), error({faile_to_start_app, App, Reason}) From 81c9dcb6aed9a4afa9baa3e759db9830abefa131 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 08:51:48 +0200 Subject: [PATCH 06/11] refactor(emqx_machine_terminator): future-proof try-catch Ensure exceptions in emqx_machine:stop_apps/0 is caught and call init:stop/0 in the after clause --- .../src/emqx_machine_terminator.erl | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index fdd75d3ad..f26287258 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -27,6 +27,8 @@ handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]). +-include_lib("emqx/include/logger.hrl"). + -define(TERMINATOR, ?MODULE). -define(DO_IT, graceful_shutdown). @@ -62,8 +64,20 @@ init(_) -> {ok, #{}}. handle_info(?DO_IT, State) -> - ok = emqx_machine:stop_apps(normal), - init:stop(), + try + emqx_machine:stop_apps(normal) + catch + C : E : St -> + Apps = [element(1, A) || A <- application:which_applications()], + ?SLOG(error, #{msg => "failed_to_stop_apps", + exception => C, + reason => E, + stacktrace => St, + remaining_apps => Apps + }) + after + init:stop() + end, {noreply, State}; handle_info(_, State) -> {noreply, State}. From 304b322a0c2567e111db8d52df127da22042cf95 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 10:20:42 +0200 Subject: [PATCH 07/11] fix(emqx_machine): handle early shutdown --- apps/emqx_machine/src/emqx_machine.erl | 11 ++++++++--- apps/emqx_machine/src/emqx_machine_terminator.erl | 11 ++++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 69a0fc05e..59587926a 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -16,8 +16,9 @@ -module(emqx_machine). --export([start/0, - graceful_shutdown/0 +-export([ start/0 + , graceful_shutdown/0 + , is_ready/0 ]). -export([ stop_apps/1 @@ -40,7 +41,6 @@ start() -> _ = load_modules(), ok = load_config_files(), - ok = ensure_apps_started(), _ = emqx_plugins:load(), @@ -48,6 +48,7 @@ start() -> ok = print_vsn(), ok = start_autocluster(), + %% NOTE: keep this to the end ok = emqx_machine_terminator:start(). graceful_shutdown() -> @@ -58,6 +59,10 @@ set_backtrace_depth() -> _ = erlang:system_flag(backtrace_depth, Depth), ok. +%% @doc Return true if boot is complete. +is_ready() -> + emqx_machine_terminator:is_running(). + -if(?OTP_RELEASE > 22). print_otp_version_warning() -> ok. -else. diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index f26287258..9a7ff6d9c 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -21,6 +21,7 @@ -export([ start/0 , graceful/0 , graceful_wait/0 + , is_running/0 ]). -export([init/1, format_status/2, @@ -40,6 +41,8 @@ start() -> %% NOTE: Do not link this process under any supervision tree ok. +is_running() -> is_pid(whereis(?TERMINATOR)). + %% @doc Send a signal to activate the terminator. graceful() -> ?TERMINATOR ! ?DO_IT, @@ -49,7 +52,8 @@ graceful() -> graceful_wait() -> case whereis(?TERMINATOR) of undefined -> - exit(emqx_machine_not_started); + ?SLOG(warning, #{msg => "shutdown_before_boot_is_complete"}), + exit_loop(); Pid -> ok = graceful(), Ref = monitor(process, Pid), @@ -59,6 +63,11 @@ graceful_wait() -> receive {'DOWN', Ref, process, Pid, _} -> ok end end. +exit_loop() -> + init:stop(), + timer:sleep(100), + exit_loop(). + init(_) -> ok = emqx_machine_signal_handler:start(), {ok, #{}}. From 75f9741d75bf41cad35787dccec18784e2177f6f Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 11:02:52 +0200 Subject: [PATCH 08/11] fix(terminator): ensure erl_signal_server init:stop ensure init:stop can be triggered by kill signal even before the terminator is ready --- .../src/emqx_machine_terminator.erl | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 9a7ff6d9c..20e818d99 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -45,7 +45,17 @@ is_running() -> is_pid(whereis(?TERMINATOR)). %% @doc Send a signal to activate the terminator. graceful() -> - ?TERMINATOR ! ?DO_IT, + try + _ = gen_server:call(?TERMINATOR, ?DO_IT, infinity) + catch + _ : _ -> + %% failed to notify terminator, probably due to not started yet + %% or node is going down, either case, the caller + %% should issue a shutdown to be sure + %% NOTE: not exit_loop here because we do not want to + %% block erl_signal_server + init:stop() + end, ok. %% @doc Shutdown the Erlang VM and wait until the terminator dies or the VM dies. @@ -72,7 +82,13 @@ init(_) -> ok = emqx_machine_signal_handler:start(), {ok, #{}}. -handle_info(?DO_IT, State) -> +handle_info(_, State) -> + {noreply, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_call(?DO_IT, _From, State) -> try emqx_machine:stop_apps(normal) catch @@ -87,13 +103,7 @@ handle_info(?DO_IT, State) -> after init:stop() end, - {noreply, State}; -handle_info(_, State) -> - {noreply, State}. - -handle_cast(_Cast, State) -> - {noreply, State}. - + {reply, ok, State}; handle_call(_Call, _From, State) -> {noreply, State}. From 5063d3a2b32b507bc3a8beeb282a3083da0c7a03 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 12:07:50 +0200 Subject: [PATCH 09/11] fix(emqx_machine): ensure digraph is deleted after use also add tests --- apps/emqx_machine/src/emqx_machine.erl | 22 ++++--- apps/emqx_machine/test/emqx_machine_tests.erl | 60 +++++++++++++++++++ 2 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_machine/test/emqx_machine_tests.erl diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 59587926a..229e8ca7e 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -27,6 +27,10 @@ -export([sorted_reboot_apps/0]). +-ifdef(TEST). +-export([sorted_reboot_apps/1]). +-endif. + -include_lib("emqx/include/logger.hrl"). %% @doc EMQ X boot entrypoint. @@ -150,13 +154,17 @@ app_deps(App) -> sorted_reboot_apps(Apps) -> G = digraph:new(), - lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), - case digraph_utils:topsort(G) of - Sorted when is_list(Sorted) -> - Sorted; - false -> - Loops = find_loops(G), - error({circular_application_dependency, Loops}) + try + lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), + case digraph_utils:topsort(G) of + Sorted when is_list(Sorted) -> + Sorted; + false -> + Loops = find_loops(G), + error({circular_application_dependency, Loops}) + end + after + digraph:delete(G) end. add_app(G, App, undefined) -> diff --git a/apps/emqx_machine/test/emqx_machine_tests.erl b/apps/emqx_machine/test/emqx_machine_tests.erl new file mode 100644 index 000000000..dded07570 --- /dev/null +++ b/apps/emqx_machine/test/emqx_machine_tests.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_tests). + +-include_lib("eunit/include/eunit.hrl"). + +sorted_reboot_apps_test_() -> + Apps1 = [{1, [2, 3, 4]}, + {2, [3, 4]} + ], + Apps2 = [{1, [2, 3, 4]}, + {2, [3, 4]}, + {5, [4, 3, 2, 1, 1]} + ], + [fun() -> check_order(Apps1) end, + fun() -> check_order(Apps2) end + ]. + +sorted_reboot_apps_cycle_test() -> + Apps = [{1,[2]},{2, [1,3]}], + ?assertError({circular_application_dependency, [[1, 2, 1], [2, 1, 2]]}, + check_order(Apps)). + + +check_order(Apps) -> + AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])), + Sorted = emqx_machine:sorted_reboot_apps(Apps), + case length(AllApps) =:= length(Sorted) of + true -> ok; + false -> error({AllApps, Sorted}) + end, + {_, SortedWithIndex} = + lists:foldr(fun(A, {I, Acc}) -> {I + 1, [{A, I} | Acc]} end, {1, []}, Sorted), + do_check_order(Apps, SortedWithIndex). + +do_check_order([], _) -> ok; +do_check_order([{A, Deps} | Rest], Sorted) -> + case lists:filter(fun(Dep) -> is_sorted_before(Dep, A, Sorted) end, Deps) of + [] -> do_check_order(Rest, Sorted); + Bad -> throw({A, Bad}) + end. + +is_sorted_before(A, B, Sorted) -> + {A, IndexA} = lists:keyfind(A, 1, Sorted), + {B, IndexB} = lists:keyfind(B, 1, Sorted), + IndexA < IndexB. From 032a49114c74eec9109ddf8601882e0a58ca5cf7 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 12:15:20 +0200 Subject: [PATCH 10/11] fix(emqx_machine): start terminator in emqx_machine_app --- apps/emqx_machine/src/emqx_machine.erl | 4 +--- apps/emqx_machine/src/emqx_machine_app.erl | 4 +++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 229e8ca7e..fcf8d3239 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -51,9 +51,7 @@ start() -> ok = print_vsn(), - ok = start_autocluster(), - %% NOTE: keep this to the end - ok = emqx_machine_terminator:start(). + ok = start_autocluster(). graceful_shutdown() -> emqx_machine_terminator:graceful_wait(). diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index 9a9b13d8f..609df37d1 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -24,7 +24,9 @@ start(_Type, _Args) -> ok = emqx_machine:start(), - emqx_machine_sup:start_link(). + {ok, Sup} = emqx_machine_sup:start_link(), + ok = emqx_machine_terminator:start(), + {ok, Sup}. stop(_State) -> ok. From e698600903a45be4dccb03930d7aae081d6ead9f Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 6 Aug 2021 12:51:25 +0200 Subject: [PATCH 11/11] refactor(emqx_machine): supervise terminator --- apps/emqx_machine/src/emqx_machine_app.erl | 4 +--- apps/emqx_machine/src/emqx_machine_sup.erl | 11 ++++++----- apps/emqx_machine/src/emqx_machine_terminator.erl | 8 +++----- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index 609df37d1..9a9b13d8f 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -24,9 +24,7 @@ start(_Type, _Args) -> ok = emqx_machine:start(), - {ok, Sup} = emqx_machine_sup:start_link(), - ok = emqx_machine_terminator:start(), - {ok, Sup}. + emqx_machine_sup:start_link(). stop(_State) -> ok. diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 114ed324f..0810eb267 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -29,18 +29,19 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - GlobalGC = child_worker(emqx_global_gc, []), - Children = [GlobalGC], - SupFlags = #{strategy => one_for_all, + GlobalGC = child_worker(emqx_global_gc, [], permanent), + Terminator = child_worker(emqx_machine_terminator, [], transient), + Children = [GlobalGC, Terminator], + SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 }, {ok, {SupFlags, Children}}. -child_worker(M, Args) -> +child_worker(M, Args, Restart) -> #{id => M, start => {M, start_link, Args}, - restart => permanent, + restart => Restart, shutdown => 5000, type => worker, modules => [M] diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 20e818d99..524cf316d 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([ start/0 +-export([ start_link/0 , graceful/0 , graceful_wait/0 , is_running/0 @@ -36,10 +36,8 @@ %% @doc This API is called to shutdown the Erlang VM by RPC call from remote shell node. %% The shutown of apps is delegated to a to a process instead of doing it in the RPC spawned %% process which has a remote group leader. -start() -> - {ok, _} = gen_server:start_link({local, ?TERMINATOR}, ?MODULE, [], []), - %% NOTE: Do not link this process under any supervision tree - ok. +start_link() -> + {ok, _} = gen_server:start_link({local, ?TERMINATOR}, ?MODULE, [], []). is_running() -> is_pid(whereis(?TERMINATOR)).