From 6697b9fa4250b3a55fb82b7af581697c6f7dca84 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 21 Oct 2021 08:49:21 +0200 Subject: [PATCH] fix(emqx_machine): Fix start/stop callbacks (#5969) * fix(emqx_machine): Fix start/stop callbacks * chore(ekka): Bump version to 0.11.1 * fix(router): Wait for the tables * fix(emqx_cluster_rpc): Stop cluster RPC when joining a cluster * fix(emqx_app): Fix a deadlock when joining the cluster * fix(emqx_telemetry): Wait for mnesia tables * test(ct_helper): Start ekka before emqx --- apps/emqx/rebar.config | 2 +- apps/emqx/src/emqx_app.erl | 5 ++- apps/emqx/src/emqx_router_helper.erl | 1 + apps/emqx/test/emqx_common_test_helpers.erl | 3 +- apps/emqx_machine/src/emqx_cluster_rpc.erl | 1 + apps/emqx_machine/src/emqx_machine_boot.erl | 18 +++++++---- apps/emqx_machine/src/emqx_machine_sup.erl | 32 +++++++++++++++++++ .../src/emqx_machine_terminator.erl | 2 +- apps/emqx_machine/test/emqx_machine_SUITE.erl | 4 +-- apps/emqx_modules/src/emqx_telemetry.erl | 1 + rebar.config | 2 +- 11 files changed, 55 insertions(+), 16 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 714693d88..4ea0b06be 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.3"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 662439397..504b245f3 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -41,7 +41,7 @@ start(_Type, _Args) -> ok = maybe_load_config(), ok = maybe_start_quicer(), - ensure_ekka_started(), + wait_boot_shards(), {ok, Sup} = emqx_sup:start_link(), ok = maybe_start_listeners(), ok = emqx_alarm_handler:load(), @@ -55,8 +55,7 @@ prep_stop(_State) -> stop(_State) -> ok. -ensure_ekka_started() -> - ekka:start(), +wait_boot_shards() -> ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity). %% @doc Call this function to make emqx boot without loading config, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index aecce70ac..cc5fc8708 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -92,6 +92,7 @@ monitor(Node) when is_atom(Node) -> init([]) -> ok = ekka:monitor(membership), + _ = mria:wait_for_tables([?ROUTING_NODE]), {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}), Nodes = lists:foldl( fun(Node, Acc) -> diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index a470370e6..f9cc2d0b0 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -133,6 +133,7 @@ start_apps(Apps, Handler) when is_function(Handler) -> %% Load all application code to beam vm first %% Because, minirest, ekka etc.. application will scan these modules lists:foreach(fun load/1, [emqx | Apps]), + ekka:start(), lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]). load(App) -> @@ -195,7 +196,7 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) -> -spec(stop_apps(list()) -> ok). stop_apps(Apps) -> - [application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]], + [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], ok. %% backward compatible diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index a55d17616..479561206 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -135,6 +135,7 @@ skip_failed_commit(Node) -> %% @private init([Node, RetryMs]) -> + _ = mria:wait_for_tables([?CLUSTER_MFA]), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 83c47331e..ed3b21a9d 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -18,7 +18,7 @@ -include_lib("emqx/include/logger.hrl"). -export([post_boot/0]). --export([stop_apps/1, ensure_apps_started/0]). +-export([stop_apps/0, ensure_apps_started/0]). -export([sorted_reboot_apps/0]). -export([start_autocluster/0]). @@ -42,15 +42,16 @@ print_vsn() -> start_autocluster() -> - ekka:callback(prepare, fun ?MODULE:stop_apps/1), - ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), + ekka:callback(stop, fun emqx_machine_boot:stop_apps/0), + ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0), _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec ok. -stop_apps(Reason) -> - ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), +stop_apps() -> + ?SLOG(notice, #{msg => "stopping_emqx_apps"}), _ = emqx_alarm_handler:unload(), - lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())), + emqx_machine_sup:stop_cluster_rpc(). stop_one_app(App) -> ?SLOG(debug, #{msg => "stopping_app", app => App}), @@ -64,8 +65,11 @@ stop_one_app(App) -> reason => E}) end. - ensure_apps_started() -> + ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}), + %% FIXME: Hack spawning the cluster RPC asynchronously to avoid a + %% deadlock somewhere in EMQ X startup + spawn_link(fun() -> emqx_machine_sup:start_cluster_rpc() end), lists:foreach(fun start_one_app/1, sorted_reboot_apps()). start_one_app(App) -> diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 406e1d483..bf5403d43 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -21,6 +21,8 @@ -behaviour(supervisor). -export([ start_link/0 + , stop_cluster_rpc/0 + , start_cluster_rpc/0 ]). -export([init/1]). @@ -28,6 +30,26 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +stop_cluster_rpc() -> + case whereis(?MODULE) of + undefined -> + ok; + _ -> + _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc_handler), + _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc), + ok + end. + +start_cluster_rpc() -> + case whereis(?MODULE) of + undefined -> + ok; + _ -> + ensure_running(emqx_cluster_rpc), + ensure_running(emqx_cluster_rpc_handler), + ok + end. + init([]) -> GlobalGC = child_worker(emqx_global_gc, [], permanent), Terminator = child_worker(emqx_machine_terminator, [], transient), @@ -52,3 +74,13 @@ child_worker(M, Func, Args, Restart) -> type => worker, modules => [M] }. + +ensure_running(Id) -> + %% Assuming Id == locally registered name + case whereis(Id) of + undefined -> + _ = supervisor:restart_child(?MODULE, Id), + ok; + _ -> + ok + end. diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 733c1a5dc..3f6c29407 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -80,7 +80,7 @@ handle_cast(_Cast, State) -> handle_call(?DO_IT, _From, State) -> try - emqx_machine_boot:stop_apps(normal) + emqx_machine_boot:stop_apps() catch C : E : St -> Apps = [element(1, A) || A <- application:which_applications()], diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index 95ce23e11..cce0778e2 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -33,9 +33,9 @@ end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). t_shutdown_reboot(_Config) -> - emqx_machine_boot:stop_apps(normal), + emqx_machine_boot:stop_apps(), false = emqx:is_running(node()), emqx_machine_boot:ensure_apps_started(), true = emqx:is_running(node()), - ok = emqx_machine_boot:stop_apps(for_test), + ok = emqx_machine_boot:stop_apps(), false = emqx:is_running(node()). diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 17f32ee58..e81d1257d 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -102,6 +102,7 @@ mnesia(boot) -> %%-------------------------------------------------------------------- start_link() -> + _ = mria:wait_for_tables([?TELEMETRY]), Opts = emqx:get_config([telemetry], #{}), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). diff --git a/rebar.config b/rebar.config index 030056bac..650b8ca9f 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}