Merge pull request #8035 from ieQu1/master
Improve EMQX startup sequence
This commit is contained in:
commit
3623acb20f
|
@ -354,8 +354,10 @@ handle_cast(Msg, State) ->
|
|||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
||||
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
||||
handle_info(
|
||||
{mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}},
|
||||
State = #state{pmon = PMon}
|
||||
) ->
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||
%% it `unsubscribed` the last topic.
|
||||
|
|
|
@ -29,6 +29,9 @@
|
|||
%% these apps are always (re)started by emqx_machine
|
||||
-define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx]).
|
||||
|
||||
%% If any of these applications crash, the entire EMQX node shuts down
|
||||
-define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]).
|
||||
|
||||
post_boot() ->
|
||||
ok = ensure_apps_started(),
|
||||
ok = print_vsn(),
|
||||
|
@ -76,7 +79,7 @@ ensure_apps_started() ->
|
|||
|
||||
start_one_app(App) ->
|
||||
?SLOG(debug, #{msg => "starting_app", app => App}),
|
||||
case application:ensure_all_started(App) of
|
||||
case application:ensure_all_started(App, restart_type(App)) of
|
||||
{ok, Apps} ->
|
||||
?SLOG(debug, #{msg => "started_apps", apps => Apps});
|
||||
{error, Reason} ->
|
||||
|
@ -84,6 +87,16 @@ start_one_app(App) ->
|
|||
error({failed_to_start_app, App, Reason})
|
||||
end.
|
||||
|
||||
restart_type(App) ->
|
||||
PermanentApps =
|
||||
?BASIC_PERMANENT_APPS ++ application:get_env(emqx_machine, permanent_applications, []),
|
||||
case lists:member(App, PermanentApps) of
|
||||
true ->
|
||||
permanent;
|
||||
false ->
|
||||
temporary
|
||||
end.
|
||||
|
||||
%% list of app names which should be rebooted when:
|
||||
%% 1. due to static config change
|
||||
%% 2. after join a cluster
|
||||
|
|
|
@ -98,17 +98,6 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
start_link() ->
|
||||
ok = mria:create_table(
|
||||
?TELEMETRY,
|
||||
[
|
||||
{type, set},
|
||||
{storage, disc_copies},
|
||||
{rlog_shard, ?TELEMETRY_SHARD},
|
||||
{record_name, telemetry},
|
||||
{attributes, record_info(fields, telemetry)}
|
||||
]
|
||||
),
|
||||
_ = mria:wait_for_tables([?TELEMETRY]),
|
||||
Opts = emqx:get_config([telemetry], #{}),
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
||||
|
||||
|
@ -134,16 +123,27 @@ get_telemetry() ->
|
|||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% This is to suppress dialyzer warnings for mria:dirty_write and
|
||||
%% dirty_read race condition. Given that the init function is not evaluated
|
||||
%% concurrently in one node, it should be free of race condition.
|
||||
%% Given the chance of having two nodes bootstraping with the write
|
||||
%% is very small, it should be safe to ignore.
|
||||
-dialyzer([{nowarn_function, [init/1]}]).
|
||||
init(_Opts) ->
|
||||
{ok, undefined, {continue, init}}.
|
||||
|
||||
handle_continue(init, _) ->
|
||||
ok = mria:create_table(
|
||||
?TELEMETRY,
|
||||
[
|
||||
{type, set},
|
||||
{storage, disc_copies},
|
||||
{rlog_shard, ?TELEMETRY_SHARD},
|
||||
{record_name, telemetry},
|
||||
{attributes, record_info(fields, telemetry)}
|
||||
]
|
||||
),
|
||||
ok = mria:wait_for_tables([?TELEMETRY]),
|
||||
State0 = empty_state(),
|
||||
{NodeUUID, ClusterUUID} = ensure_uuids(),
|
||||
{ok, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}.
|
||||
{noreply, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}};
|
||||
handle_continue(Continue, State) ->
|
||||
?SLOG(error, #{msg => "unexpected_continue", continue => Continue}),
|
||||
{noreply, State}.
|
||||
|
||||
handle_call(enable, _From, State) ->
|
||||
%% Wait a few moments before reporting the first telemetry, as the
|
||||
|
@ -170,10 +170,6 @@ handle_cast(Msg, State) ->
|
|||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||
{noreply, State}.
|
||||
|
||||
handle_continue(Continue, State) ->
|
||||
?SLOG(error, #{msg => "unexpected_continue", continue => Continue}),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
|
||||
State = report_telemetry(State0),
|
||||
{noreply, ensure_report_timer(State)};
|
||||
|
|
|
@ -57,7 +57,7 @@ overrides() ->
|
|||
%% Temporary workaround for a rebar3 erl_opts duplication
|
||||
%% bug. Ideally, we want to set this define globally
|
||||
snabbkaffe_overrides() ->
|
||||
Apps = [snabbkaffe, ekka, mria],
|
||||
Apps = [snabbkaffe, ekka, mria, gen_rpc],
|
||||
[{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps].
|
||||
|
||||
config() ->
|
||||
|
|
Loading…
Reference in New Issue