fix(emqx_machine): Fix start/stop callbacks (#5969)

* fix(emqx_machine): Fix start/stop callbacks

* chore(ekka): Bump version to 0.11.1

* fix(router): Wait for the tables

* fix(emqx_cluster_rpc): Stop cluster RPC when joining a cluster

* fix(emqx_app): Fix a deadlock when joining the cluster

* fix(emqx_telemetry): Wait for mnesia tables

* test(ct_helper): Start ekka before emqx
This commit is contained in:
k32 2021-10-21 08:49:21 +02:00 committed by x1001100011
parent 2f2a093150
commit 6697b9fa42
11 changed files with 55 additions and 16 deletions

View File

@ -15,7 +15,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.3"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.3"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

View File

@ -41,7 +41,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
ok = maybe_load_config(), ok = maybe_load_config(),
ok = maybe_start_quicer(), ok = maybe_start_quicer(),
ensure_ekka_started(), wait_boot_shards(),
{ok, Sup} = emqx_sup:start_link(), {ok, Sup} = emqx_sup:start_link(),
ok = maybe_start_listeners(), ok = maybe_start_listeners(),
ok = emqx_alarm_handler:load(), ok = emqx_alarm_handler:load(),
@ -55,8 +55,7 @@ prep_stop(_State) ->
stop(_State) -> ok. stop(_State) -> ok.
ensure_ekka_started() -> wait_boot_shards() ->
ekka:start(),
ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity). ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
%% @doc Call this function to make emqx boot without loading config, %% @doc Call this function to make emqx boot without loading config,

View File

@ -92,6 +92,7 @@ monitor(Node) when is_atom(Node) ->
init([]) -> init([]) ->
ok = ekka:monitor(membership), ok = ekka:monitor(membership),
_ = mria:wait_for_tables([?ROUTING_NODE]),
{ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}), {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
Nodes = lists:foldl( Nodes = lists:foldl(
fun(Node, Acc) -> fun(Node, Acc) ->

View File

@ -133,6 +133,7 @@ start_apps(Apps, Handler) when is_function(Handler) ->
%% Load all application code to beam vm first %% Load all application code to beam vm first
%% Because, minirest, ekka etc.. application will scan these modules %% Because, minirest, ekka etc.. application will scan these modules
lists:foreach(fun load/1, [emqx | Apps]), lists:foreach(fun load/1, [emqx | Apps]),
ekka:start(),
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]). lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
load(App) -> load(App) ->
@ -195,7 +196,7 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
-spec(stop_apps(list()) -> ok). -spec(stop_apps(list()) -> ok).
stop_apps(Apps) -> stop_apps(Apps) ->
[application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]], [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
ok. ok.
%% backward compatible %% backward compatible

View File

@ -135,6 +135,7 @@ skip_failed_commit(Node) ->
%% @private %% @private
init([Node, RetryMs]) -> init([Node, RetryMs]) ->
_ = mria:wait_for_tables([?CLUSTER_MFA]),
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
{ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.

View File

@ -18,7 +18,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-export([post_boot/0]). -export([post_boot/0]).
-export([stop_apps/1, ensure_apps_started/0]). -export([stop_apps/0, ensure_apps_started/0]).
-export([sorted_reboot_apps/0]). -export([sorted_reboot_apps/0]).
-export([start_autocluster/0]). -export([start_autocluster/0]).
@ -42,15 +42,16 @@ print_vsn() ->
start_autocluster() -> start_autocluster() ->
ekka:callback(prepare, fun ?MODULE:stop_apps/1), ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
ok. ok.
stop_apps(Reason) -> stop_apps() ->
?SLOG(info, #{msg => "stopping_apps", reason => Reason}), ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(), _ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
emqx_machine_sup:stop_cluster_rpc().
stop_one_app(App) -> stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}), ?SLOG(debug, #{msg => "stopping_app", app => App}),
@ -64,8 +65,11 @@ stop_one_app(App) ->
reason => E}) reason => E})
end. end.
ensure_apps_started() -> ensure_apps_started() ->
?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
%% FIXME: Hack spawning the cluster RPC asynchronously to avoid a
%% deadlock somewhere in EMQ X startup
spawn_link(fun() -> emqx_machine_sup:start_cluster_rpc() end),
lists:foreach(fun start_one_app/1, sorted_reboot_apps()). lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) -> start_one_app(App) ->

View File

@ -21,6 +21,8 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ start_link/0 -export([ start_link/0
, stop_cluster_rpc/0
, start_cluster_rpc/0
]). ]).
-export([init/1]). -export([init/1]).
@ -28,6 +30,26 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
stop_cluster_rpc() ->
case whereis(?MODULE) of
undefined ->
ok;
_ ->
_ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc_handler),
_ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc),
ok
end.
start_cluster_rpc() ->
case whereis(?MODULE) of
undefined ->
ok;
_ ->
ensure_running(emqx_cluster_rpc),
ensure_running(emqx_cluster_rpc_handler),
ok
end.
init([]) -> init([]) ->
GlobalGC = child_worker(emqx_global_gc, [], permanent), GlobalGC = child_worker(emqx_global_gc, [], permanent),
Terminator = child_worker(emqx_machine_terminator, [], transient), Terminator = child_worker(emqx_machine_terminator, [], transient),
@ -52,3 +74,13 @@ child_worker(M, Func, Args, Restart) ->
type => worker, type => worker,
modules => [M] modules => [M]
}. }.
ensure_running(Id) ->
%% Assuming Id == locally registered name
case whereis(Id) of
undefined ->
_ = supervisor:restart_child(?MODULE, Id),
ok;
_ ->
ok
end.

View File

@ -80,7 +80,7 @@ handle_cast(_Cast, State) ->
handle_call(?DO_IT, _From, State) -> handle_call(?DO_IT, _From, State) ->
try try
emqx_machine_boot:stop_apps(normal) emqx_machine_boot:stop_apps()
catch catch
C : E : St -> C : E : St ->
Apps = [element(1, A) || A <- application:which_applications()], Apps = [element(1, A) || A <- application:which_applications()],

View File

@ -33,9 +33,9 @@ end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_common_test_helpers:stop_apps([]).
t_shutdown_reboot(_Config) -> t_shutdown_reboot(_Config) ->
emqx_machine_boot:stop_apps(normal), emqx_machine_boot:stop_apps(),
false = emqx:is_running(node()), false = emqx:is_running(node()),
emqx_machine_boot:ensure_apps_started(), emqx_machine_boot:ensure_apps_started(),
true = emqx:is_running(node()), true = emqx:is_running(node()),
ok = emqx_machine_boot:stop_apps(for_test), ok = emqx_machine_boot:stop_apps(),
false = emqx:is_running(node()). false = emqx:is_running(node()).

View File

@ -102,6 +102,7 @@ mnesia(boot) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link() -> start_link() ->
_ = mria:wait_for_tables([?TELEMETRY]),
Opts = emqx:get_config([telemetry], #{}), Opts = emqx:get_config([telemetry], #{}),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).

View File

@ -50,7 +50,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}