From 9474241ad958be7272a3d0a3e32f3dda9ec36951 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 24 May 2022 11:09:17 +0200 Subject: [PATCH 1/4] fix(emqx_telemetry): Async start of telemetry module --- apps/emqx_modules/src/emqx_telemetry.erl | 40 +++++++++++------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 02e3bc3f9..df4f51c4c 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -98,17 +98,6 @@ %%-------------------------------------------------------------------- start_link() -> - ok = mria:create_table( - ?TELEMETRY, - [ - {type, set}, - {storage, disc_copies}, - {rlog_shard, ?TELEMETRY_SHARD}, - {record_name, telemetry}, - {attributes, record_info(fields, telemetry)} - ] - ), - _ = mria:wait_for_tables([?TELEMETRY]), Opts = emqx:get_config([telemetry], #{}), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). @@ -134,16 +123,27 @@ get_telemetry() -> %% gen_server callbacks %%-------------------------------------------------------------------- -%% This is to suppress dialyzer warnings for mria:dirty_write and -%% dirty_read race condition. Given that the init function is not evaluated -%% concurrently in one node, it should be free of race condition. -%% Given the chance of having two nodes bootstraping with the write -%% is very small, it should be safe to ignore. --dialyzer([{nowarn_function, [init/1]}]). init(_Opts) -> + {ok, undefined, {continue, init}}. + +handle_continue(init, _) -> + ok = mria:create_table( + ?TELEMETRY, + [ + {type, set}, + {storage, disc_copies}, + {rlog_shard, ?TELEMETRY_SHARD}, + {record_name, telemetry}, + {attributes, record_info(fields, telemetry)} + ] + ), + ok = mria:wait_for_tables([?TELEMETRY]), State0 = empty_state(), {NodeUUID, ClusterUUID} = ensure_uuids(), - {ok, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}. + {noreply, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}; +handle_continue(Continue, State) -> + ?SLOG(error, #{msg => "unexpected_continue", continue => Continue}), + {noreply, State}. handle_call(enable, _From, State) -> %% Wait a few moments before reporting the first telemetry, as the @@ -170,10 +170,6 @@ handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. -handle_continue(Continue, State) -> - ?SLOG(error, #{msg => "unexpected_continue", continue => Continue}), - {noreply, State}. - handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) -> State = report_telemetry(State0), {noreply, ensure_report_timer(State)}; From 27c922dbf2cd4abf29ecfbb893f8f54c1999036b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 24 May 2022 12:39:37 +0200 Subject: [PATCH 2/4] fix(emqx_machine): Start essential applications as permanent --- apps/emqx_machine/src/emqx_machine_boot.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 638b1488f..dbbf12da4 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -29,6 +29,9 @@ %% these apps are always (re)started by emqx_machine -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx]). +%% If any of these applications crash, the entire EMQX node shuts down +-define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]). + post_boot() -> ok = ensure_apps_started(), ok = print_vsn(), @@ -76,7 +79,7 @@ ensure_apps_started() -> start_one_app(App) -> ?SLOG(debug, #{msg => "starting_app", app => App}), - case application:ensure_all_started(App) of + case application:ensure_all_started(App, restart_type(App)) of {ok, Apps} -> ?SLOG(debug, #{msg => "started_apps", apps => Apps}); {error, Reason} -> @@ -84,6 +87,16 @@ start_one_app(App) -> error({failed_to_start_app, App, Reason}) end. +restart_type(App) -> + PermanentApps = + ?BASIC_PERMANENT_APPS ++ application:get_env(emqx_machine, permanent_applications, []), + case lists:member(App, PermanentApps) of + true -> + permanent; + false -> + temporary + end. + %% list of app names which should be rebooted when: %% 1. due to static config change %% 2. after join a cluster From 9de609224e5438ffcd4e1d7df4bff220dbb9e2bf Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 24 May 2022 12:40:24 +0200 Subject: [PATCH 3/4] fix(logging): Add compile-time overrides for gen_rpc application --- rebar.config.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config.erl b/rebar.config.erl index c7ef3ef2c..1d41e7768 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -57,7 +57,7 @@ overrides() -> %% Temporary workaround for a rebar3 erl_opts duplication %% bug. Ideally, we want to set this define globally snabbkaffe_overrides() -> - Apps = [snabbkaffe, ekka, mria], + Apps = [snabbkaffe, ekka, mria, gen_rpc], [{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps]. config() -> From e7d70c142c4bb9fa02e596e939317c0cf37515e0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 24 May 2022 13:03:18 +0200 Subject: [PATCH 4/4] fix(emqx_shared_sub): Don't crash on table schema change --- apps/emqx/src/emqx_shared_sub.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ee45b991d..5539c4427 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -354,8 +354,10 @@ handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. -handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> - #emqx_shared_subscription{subpid = SubPid} = NewRecord, +handle_info( + {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}}, + State = #state{pmon = PMon} +) -> {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until %% it `unsubscribed` the last topic.