diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 550e650a2..0f66bc37f 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -28,6 +28,13 @@ -define(MOD_DELAYED_SHARD, emqx_delayed_shard). -define(CM_SHARD, emqx_cm_shard). +-define(EMQX_SHARDS, [ ?ROUTE_SHARD + , ?COMMON_SHARD + , ?SHARED_SUB_SHARD + , ?RULE_ENGINE_SHARD + , ?MOD_DELAYED_SHARD + ]). + %%-------------------------------------------------------------------- %% Banner %%-------------------------------------------------------------------- @@ -149,4 +156,4 @@ -record(chain, { name :: atom() , authenticators :: [#authenticator{}] - }). \ No newline at end of file + }). diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 40a92565b..c04c60b58 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -24,6 +24,7 @@ , get_description/0 , get_release/0 , set_init_config_load_done/0 + , get_init_config_load_done/0 , set_override_conf_file/1 ]). @@ -33,24 +34,14 @@ -define(APP, emqx). --define(EMQX_SHARDS, [ ?ROUTE_SHARD - , ?COMMON_SHARD - , ?SHARED_SUB_SHARD - , ?RULE_ENGINE_SHARD - , ?MOD_DELAYED_SHARD - ]). - - %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- start(_Type, _Args) -> ok = maybe_load_config(), - %% Load application first for ekka_mnesia scanner - ekka:start(), - ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity), ok = maybe_start_quicer(), + start_ekka(), {ok, Sup} = emqx_sup:start_link(), ok = maybe_start_listeners(), ok = emqx_alarm_handler:load(), @@ -70,15 +61,18 @@ stop(_State) -> ok. set_init_config_load_done() -> application:set_env(emqx, init_config_load_done, true). +get_init_config_load_done() -> + application:get_env(emqx, init_config_load_done, false). + %% @doc This API is mostly for testing. %% The override config file is typically located in the 'data' dir when -%% it is a emqx release, but emqx app should not have to konw where the +%% it is a emqx release, but emqx app should not have to know where the %% 'data' dir is located. set_override_conf_file(File) -> application:set_env(emqx, override_conf_file, File). maybe_load_config() -> - case application:get_env(emqx, init_config_load_done, false) of + case get_init_config_load_done() of true -> ok; false -> @@ -86,6 +80,11 @@ maybe_load_config() -> ConfFiles = application:get_env(emqx, config_files, []), emqx_config:init_load(emqx_schema, ConfFiles) end. +%% @doc This API is mostly for testing +%% we already start ekka in emqx_machine +start_ekka() -> + ekka:start(), + ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity). maybe_start_listeners() -> case emqx_boot:is_enabled(listeners) of diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 4fb2913d5..0ecc0361b 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -21,17 +21,8 @@ , is_ready/0 ]). --export([ stop_apps/1 - , ensure_apps_started/0 - ]). - --export([sorted_reboot_apps/0]). - --ifdef(TEST). --export([sorted_reboot_apps/1]). --endif. - -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). %% @doc EMQ X boot entrypoint. start() -> @@ -45,14 +36,10 @@ start() -> ok = print_otp_version_warning(), ok = load_config_files(), - - ok = ensure_apps_started(), - - _ = emqx_plugins:load(), - - ok = print_vsn(), - - ok = start_autocluster(). + %% Load application first for ekka_mnesia scanner + ekka:start(), + ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity), + ok. graceful_shutdown() -> emqx_machine_terminator:graceful_wait(). @@ -74,13 +61,6 @@ print_otp_version_warning() -> [?OTP_RELEASE]). -endif. % OTP_RELEASE > 22 --ifdef(TEST). -print_vsn() -> ok. --else. % TEST -print_vsn() -> - ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). --endif. % TEST - load_config_files() -> %% the app env 'config_files' for 'emqx` app should be set %% in app.time.config by boot script before starting Erlang VM @@ -89,114 +69,3 @@ load_config_files() -> ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), %% to avoid config being loaded again when emqx app starts. ok = emqx_app:set_init_config_load_done(). - -start_autocluster() -> - ekka:callback(prepare, fun ?MODULE:stop_apps/1), - ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), - _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec - ok. - -stop_apps(Reason) -> - ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), - _ = emqx_alarm_handler:unload(), - lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). - -stop_one_app(App) -> - ?SLOG(debug, #{msg => "stopping_app", app => App}), - try - _ = application:stop(App) - catch - C : E -> - ?SLOG(error, #{msg => "failed_to_stop_app", - app => App, - exception => C, - reason => E}) - end. - - -ensure_apps_started() -> - lists:foreach(fun start_one_app/1, sorted_reboot_apps()). - -start_one_app(App) -> - ?SLOG(debug, #{msg => "starting_app", app => App}), - case application:ensure_all_started(App) of - {ok, Apps} -> - ?SLOG(debug, #{msg => "started_apps", apps => Apps}); - {error, Reason} -> - ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), - error({failed_to_start_app, App, Reason}) - end. - -%% list of app names which should be rebooted when: -%% 1. due to static static config change -%% 2. after join a cluster -reboot_apps() -> - [ gproc - , esockd - , ranch - , cowboy - , emqx - , emqx_prometheus - , emqx_modules - , emqx_dashboard - , emqx_connector - , emqx_gateway - , emqx_statsd - , emqx_resource - , emqx_rule_engine - , emqx_bridge - , emqx_bridge_mqtt - , emqx_plugin_libs - , emqx_management - , emqx_retainer - , emqx_exhook - , emqx_authn - , emqx_authz - , emqx_psk - ]. - -sorted_reboot_apps() -> - Apps = [{App, app_deps(App)} || App <- reboot_apps()], - sorted_reboot_apps(Apps). - -app_deps(App) -> - case application:get_key(App, applications) of - undefined -> []; - {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) - end. - -sorted_reboot_apps(Apps) -> - G = digraph:new(), - try - lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), - case digraph_utils:topsort(G) of - Sorted when is_list(Sorted) -> - Sorted; - false -> - Loops = find_loops(G), - error({circular_application_dependency, Loops}) - end - after - digraph:delete(G) - end. - -add_app(G, App, undefined) -> - ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), - %% not loaded - add_app(G, App, []); -add_app(_G, _App, []) -> - ok; -add_app(G, App, [Dep | Deps]) -> - digraph:add_vertex(G, App), - digraph:add_vertex(G, Dep), - digraph:add_edge(G, Dep, App), %% dep -> app as dependency - add_app(G, App, Deps). - -find_loops(G) -> - lists:filtermap( - fun (App) -> - case digraph:get_short_cycle(G, App) of - false -> false; - Apps -> {true, Apps} - end - end, digraph:vertices(G)). diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl new file mode 100644 index 000000000..8fc3a14f4 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -0,0 +1,152 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_machine_boot). + +-include_lib("emqx/include/logger.hrl"). + +-export([post_boot/0]). +-export([stop_apps/1, ensure_apps_started/0]). +-export([sorted_reboot_apps/0]). +-export([start_autocluster/0]). + +-ifdef(TEST). +-export([sorted_reboot_apps/1]). +-endif. + +post_boot() -> + ok = ensure_apps_started(), + _ = emqx_plugins:load(), + ok = print_vsn(), + ok = start_autocluster(), + ignore. + +-ifdef(TEST). +print_vsn() -> ok. +-else. % TEST +print_vsn() -> + ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). +-endif. % TEST + + +start_autocluster() -> + ekka:callback(prepare, fun ?MODULE:stop_apps/1), + ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), + _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec + ok. + +stop_apps(Reason) -> + ?SLOG(info, #{msg => "stopping_apps", reason => Reason}), + _ = emqx_alarm_handler:unload(), + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). + +stop_one_app(App) -> + ?SLOG(debug, #{msg => "stopping_app", app => App}), + try + _ = application:stop(App) + catch + C : E -> + ?SLOG(error, #{msg => "failed_to_stop_app", + app => App, + exception => C, + reason => E}) + end. + + +ensure_apps_started() -> + lists:foreach(fun start_one_app/1, sorted_reboot_apps()). + +start_one_app(App) -> + ?SLOG(debug, #{msg => "starting_app", app => App}), + case application:ensure_all_started(App) of + {ok, Apps} -> + ?SLOG(debug, #{msg => "started_apps", apps => Apps}); + {error, Reason} -> + ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), + error({failed_to_start_app, App, Reason}) + end. + +%% list of app names which should be rebooted when: +%% 1. due to static static config change +%% 2. after join a cluster +reboot_apps() -> + [ gproc + , esockd + , ranch + , cowboy + , emqx + , emqx_prometheus + , emqx_modules + , emqx_dashboard + , emqx_connector + , emqx_gateway + , emqx_statsd + , emqx_resource + , emqx_rule_engine + , emqx_bridge + , emqx_bridge_mqtt + , emqx_plugin_libs + , emqx_management + , emqx_retainer + , emqx_exhook + , emqx_authn + , emqx_authz + ]. + +sorted_reboot_apps() -> + Apps = [{App, app_deps(App)} || App <- reboot_apps()], + sorted_reboot_apps(Apps). + +app_deps(App) -> + case application:get_key(App, applications) of + undefined -> []; + {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) + end. + +sorted_reboot_apps(Apps) -> + G = digraph:new(), + try + lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), + case digraph_utils:topsort(G) of + Sorted when is_list(Sorted) -> + Sorted; + false -> + Loops = find_loops(G), + error({circular_application_dependency, Loops}) + end + after + digraph:delete(G) + end. + +add_app(G, App, undefined) -> + ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), + %% not loaded + add_app(G, App, []); +add_app(_G, _App, []) -> + ok; +add_app(G, App, [Dep | Deps]) -> + digraph:add_vertex(G, App), + digraph:add_vertex(G, Dep), + digraph:add_edge(G, Dep, App), %% dep -> app as dependency + add_app(G, App, Deps). + +find_loops(G) -> + lists:filtermap( + fun (App) -> + case digraph:get_short_cycle(G, App) of + false -> false; + Apps -> {true, Apps} + end + end, digraph:vertices(G)). diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 798beee1c..406e1d483 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -33,7 +33,8 @@ init([]) -> Terminator = child_worker(emqx_machine_terminator, [], transient), ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent), ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent), - Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler], + BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary), + Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler, BootApps], SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 @@ -41,8 +42,11 @@ init([]) -> {ok, {SupFlags, Children}}. child_worker(M, Args, Restart) -> + child_worker(M, start_link, Args, Restart). + +child_worker(M, Func, Args, Restart) -> #{id => M, - start => {M, start_link, Args}, + start => {M, Func, Args}, restart => Restart, shutdown => 5000, type => worker, diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 74479a6d9..733c1a5dc 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -80,7 +80,7 @@ handle_cast(_Cast, State) -> handle_call(?DO_IT, _From, State) -> try - emqx_machine:stop_apps(normal) + emqx_machine_boot:stop_apps(normal) catch C : E : St -> Apps = [element(1, A) || A <- application:which_applications()], diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index 51cf4f8b4..1e90d867b 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -33,9 +33,9 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_shutdown_reboot(_Config) -> - emqx_machine:stop_apps(normal), + emqx_machine_boot:stop_apps(normal), false = emqx:is_running(node()), - emqx_machine:ensure_apps_started(), + emqx_machine_boot:ensure_apps_started(), true = emqx:is_running(node()), - ok = emqx_machine:stop_apps(for_test), + ok = emqx_machine_boot:stop_apps(for_test), false = emqx:is_running(node()). diff --git a/apps/emqx_machine/test/emqx_machine_tests.erl b/apps/emqx_machine/test/emqx_machine_tests.erl index dded07570..1a562b815 100644 --- a/apps/emqx_machine/test/emqx_machine_tests.erl +++ b/apps/emqx_machine/test/emqx_machine_tests.erl @@ -38,7 +38,7 @@ sorted_reboot_apps_cycle_test() -> check_order(Apps) -> AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])), - Sorted = emqx_machine:sorted_reboot_apps(Apps), + Sorted = emqx_machine_boot:sorted_reboot_apps(Apps), case length(AllApps) =:= length(Sorted) of true -> ok; false -> error({AllApps, Sorted})