fix(machine_boot): use shared list of reboot apps and add bridges to reboot list

This commit is contained in:
Thales Macedo Garitezi 2023-07-19 14:59:25 -03:00
parent 57e39f42c5
commit b9b11d8f4d
7 changed files with 267 additions and 275 deletions

View File

@ -295,7 +295,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt))))
.PHONY: fmt
fmt: $(REBAR)
@$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}'
@$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,priv,test}/**/*.{erl,hrl,app.src,eterm}'
@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
@mix format

View File

@ -547,6 +547,7 @@ start_cluster(Cluster) ->
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
NumNodes = length(Nodes),
on_exit(fun() ->
emqx_utils:pmap(
fun(N) ->
@ -556,6 +557,11 @@ start_cluster(Cluster) ->
Nodes
)
end),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
),
Nodes.
kill_resource_managers() ->

View File

@ -0,0 +1,110 @@
%% -*- mode: erlang; -*-
#{
%% must always be of type `load'
db_apps =>
[
mnesia_rocksdb,
mnesia,
mria,
ekka
],
system_apps =>
[
kernel,
sasl,
crypto,
public_key,
asn1,
syntax_tools,
ssl,
os_mon,
inets,
compiler,
runtime_tools,
redbug,
xmerl,
{hocon, load},
telemetry
],
%% must always be of type `load'
common_business_apps =>
[
emqx,
emqx_conf,
esasl,
observer_cli,
tools,
covertool,
%% started by emqx_machine
system_monitor,
emqx_utils,
emqx_http_lib,
emqx_resource,
emqx_connector,
emqx_authn,
emqx_authz,
emqx_auto_subscribe,
emqx_gateway,
emqx_gateway_stomp,
emqx_gateway_mqttsn,
emqx_gateway_coap,
emqx_gateway_lwm2m,
emqx_gateway_exproto,
emqx_exhook,
emqx_bridge,
emqx_bridge_mqtt,
emqx_bridge_http,
emqx_rule_engine,
emqx_modules,
emqx_management,
emqx_dashboard,
emqx_retainer,
emqx_prometheus,
emqx_psk,
emqx_slow_subs,
emqx_mongodb,
emqx_redis,
emqx_mysql,
emqx_plugins,
quicer,
bcrypt,
jq,
observer
],
%% must always be of type `load'
ee_business_apps =>
[
emqx_license,
emqx_enterprise,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,
emqx_bridge_cassandra,
emqx_bridge_opents,
emqx_bridge_clickhouse,
emqx_bridge_dynamo,
emqx_bridge_hstreamdb,
emqx_bridge_influxdb,
emqx_bridge_iotdb,
emqx_bridge_matrix,
emqx_bridge_mongodb,
emqx_bridge_mysql,
emqx_bridge_pgsql,
emqx_bridge_redis,
emqx_bridge_rocketmq,
emqx_bridge_tdengine,
emqx_bridge_timescale,
emqx_bridge_sqlserver,
emqx_oracle,
emqx_bridge_oracle,
emqx_bridge_rabbitmq,
emqx_schema_registry,
emqx_eviction_agent,
emqx_node_rebalance,
emqx_ft
],
%% must always be of type `load'
ce_business_apps =>
[emqx_telemetry]
}.

View File

@ -16,6 +16,7 @@
-module(emqx_machine_boot).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([post_boot/0]).
-export([stop_apps/0, ensure_apps_started/0]).
@ -24,7 +25,6 @@
-export([stop_port_apps/0]).
-dialyzer({no_match, [basic_reboot_apps/0]}).
-dialyzer({no_match, [basic_reboot_apps_edition/1]}).
-ifdef(TEST).
-export([sorted_reboot_apps/1, reboot_apps/0]).
@ -94,7 +94,8 @@ stop_one_app(App) ->
ensure_apps_started() ->
?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
lists:foreach(fun start_one_app/1, sorted_reboot_apps()),
?tp(emqx_machine_boot_apps_started, #{}).
start_one_app(App) ->
?SLOG(debug, #{msg => "starting_app", app => App}),
@ -128,41 +129,62 @@ reboot_apps() ->
BaseRebootApps ++ ConfigApps.
basic_reboot_apps() ->
?BASIC_REBOOT_APPS ++
[
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_slow_subs,
emqx_auto_subscribe,
emqx_plugins,
emqx_psk
] ++ basic_reboot_apps_edition(emqx_release:edition()).
PrivDir = code:priv_dir(emqx_machine),
RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
{ok, [
#{
common_business_apps := CommonBusinessApps0,
ee_business_apps := EEBusinessApps,
ce_business_apps := CEBusinessApps
}
]} = file:consult(RebootListPath),
Filters0 = maps:from_list([
{App, is_app(App)}
|| App <- [quicer, bcrypt, jq, observer]
]),
CommonBusinessApps =
filter(
CommonBusinessApps0,
%% We don't need to restart these
Filters0#{
system_monitor => false,
observer => false,
quicer => false
}
),
EditionSpecificApps =
case emqx_release:edition() of
ee -> EEBusinessApps;
ce -> CEBusinessApps;
_ -> []
end,
BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
?BASIC_REBOOT_APPS ++ BusinessApps.
basic_reboot_apps_edition(ce) ->
[emqx_telemetry];
basic_reboot_apps_edition(ee) ->
[
emqx_license,
emqx_s3,
emqx_ft,
emqx_eviction_agent,
emqx_node_rebalance,
emqx_schema_registry
];
%% unexcepted edition, should not happen
basic_reboot_apps_edition(_) ->
[].
filter(AppList, Filters) ->
lists:foldr(
fun(App, Acc) ->
AppName =
case App of
{Name, _Type} -> Name;
Name when is_atom(Name) -> Name
end,
ShouldKeep = maps:get(AppName, Filters, true),
case ShouldKeep of
true -> [App | Acc];
false -> Acc
end
end,
[],
AppList
).
is_app(Name) ->
case application:load(Name) of
ok -> true;
{error, {already_loaded, _}} -> true;
_ -> false
end.
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],

View File

@ -364,11 +364,7 @@ cluster(Config) ->
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_enterprise_schema},
%% need to restart schema registry app in the tests so
%% that it re-registers the config handler that is lost
%% when emqx_conf restarts during join.
{env, [{emqx_machine, applications, [emqx_schema_registry]}]},
{load_apps, [emqx_machine | ?APPS]},
{load_apps, [emqx_machine]},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
@ -388,6 +384,7 @@ start_cluster(Cluster) ->
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
NumNodes = length(Nodes),
on_exit(fun() ->
emqx_utils:pmap(
fun(N) ->
@ -397,7 +394,11 @@ start_cluster(Cluster) ->
Nodes
)
end),
erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
),
Nodes.
wait_for_cluster_rpc(Node) ->
@ -658,7 +659,7 @@ t_cluster_serde_build(Config) ->
Nodes = [N1, N2 | _] = start_cluster(Cluster),
NumNodes = length(Nodes),
wait_for_cluster_rpc(N2),
?assertEqual(
?assertMatch(
ok,
erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])
),
@ -687,7 +688,7 @@ t_cluster_serde_build(Config) ->
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
NumNodes,
5_000
10_000
),
?assertEqual(
ok,

170
mix.exs
View File

@ -297,6 +297,7 @@ defmodule EMQXUmbrella.MixProject do
[
applications: applications(edition_type),
skip_mode_validation_for: [
:emqx_mix,
:emqx_gateway,
:emqx_gateway_stomp,
:emqx_gateway_mqttsn,
@ -316,7 +317,10 @@ defmodule EMQXUmbrella.MixProject do
:emqx_auto_subscribe,
:emqx_slow_subs,
:emqx_plugins,
:emqx_ft
:emqx_ft,
:emqx_s3,
:emqx_durable_storage,
:rabbit_common
],
steps: steps,
strip_beams: false
@ -326,137 +330,57 @@ defmodule EMQXUmbrella.MixProject do
end
def applications(edition_type) do
system_apps = [
crypto: :permanent,
public_key: :permanent,
asn1: :permanent,
syntax_tools: :permanent,
ssl: :permanent,
os_mon: :permanent,
inets: :permanent,
compiler: :permanent,
runtime_tools: :permanent,
redbug: :permanent,
xmerl: :permanent,
hocon: :load,
telemetry: :permanent
]
{:ok,
[
%{
db_apps: db_apps,
system_apps: system_apps,
common_business_apps: common_business_apps,
ee_business_apps: ee_business_apps,
ce_business_apps: ce_business_apps
}
]} = :file.consult("apps/emqx_machine/priv/reboot_lists.eterm")
db_apps =
if enable_rocksdb?() do
[:mnesia_rocksdb]
db_apps = filter(db_apps, %{mnesia_rocksdb: enable_rocksdb?()})
common_business_apps =
filter(common_business_apps, %{
quicer: enable_quicer?(),
bcrypt: enable_bcrypt?(),
jq: enable_jq?(),
observer: is_app?(:observer)
})
edition_specific_apps =
if edition_type == :enterprise do
ee_business_apps
else
[]
end ++
[
:mnesia,
:mria,
:ekka
]
ce_business_apps
end
business_apps =
[
:emqx,
:emqx_conf,
:esasl,
:observer_cli,
:tools,
:covertool,
:system_monitor,
:emqx_utils,
:emqx_http_lib,
:emqx_resource,
:emqx_connector,
:emqx_authn,
:emqx_authz,
:emqx_auto_subscribe,
:emqx_gateway,
:emqx_gateway_stomp,
:emqx_gateway_mqttsn,
:emqx_gateway_coap,
:emqx_gateway_lwm2m,
:emqx_gateway_exproto,
:emqx_exhook,
:emqx_bridge,
:emqx_bridge_mqtt,
:emqx_bridge_http,
:emqx_rule_engine,
:emqx_modules,
:emqx_management,
:emqx_dashboard,
:emqx_retainer,
:emqx_prometheus,
:emqx_psk,
:emqx_slow_subs,
:emqx_mongodb,
:emqx_redis,
:emqx_mysql,
:emqx_plugins,
:emqx_mix
] ++
if enable_quicer?() do
[:quicer]
else
[]
end ++
if enable_bcrypt?() do
[:bcrypt]
else
[]
end ++
if enable_jq?() do
[:jq]
else
[]
end ++
if(is_app(:observer),
do: [:observer],
else: []
) ++
case edition_type do
:enterprise ->
[
:emqx_license,
:emqx_enterprise,
:emqx_bridge_kafka,
:emqx_bridge_pulsar,
:emqx_bridge_gcp_pubsub,
:emqx_bridge_cassandra,
:emqx_bridge_opents,
:emqx_bridge_clickhouse,
:emqx_bridge_dynamo,
:emqx_bridge_hstreamdb,
:emqx_bridge_influxdb,
:emqx_bridge_iotdb,
:emqx_bridge_matrix,
:emqx_bridge_mongodb,
:emqx_bridge_mysql,
:emqx_bridge_pgsql,
:emqx_bridge_redis,
:emqx_bridge_rocketmq,
:emqx_bridge_tdengine,
:emqx_bridge_timescale,
:emqx_bridge_sqlserver,
:emqx_oracle,
:emqx_bridge_oracle,
:emqx_bridge_rabbitmq,
:emqx_schema_registry,
:emqx_eviction_agent,
:emqx_node_rebalance,
:emqx_ft
]
business_apps = common_business_apps ++ edition_specific_apps
_ ->
[:emqx_telemetry]
end
system_apps ++
Enum.map(system_apps, fn app ->
if is_atom(app), do: {app, :permanent}, else: app
end) ++
Enum.map(db_apps, &{&1, :load}) ++
[emqx_machine: :permanent] ++
Enum.map(business_apps, &{&1, :load})
end
defp is_app(name) do
defp filter(apps, filters) do
Enum.filter(apps, fn app ->
app_name =
case app do
{app_name, _type} -> app_name
app_name when is_atom(app_name) -> app_name
end
Map.get(filters, app_name, true)
end)
end
defp is_app?(name) do
case Application.load(name) do
:ok ->
true

View File

@ -386,85 +386,48 @@ overlay_vars_pkg(pkg) ->
].
relx_apps(ReleaseType, Edition) ->
SystemApps =
[
kernel,
sasl,
crypto,
public_key,
asn1,
syntax_tools,
ssl,
os_mon,
inets,
compiler,
runtime_tools,
redbug,
xmerl,
{hocon, load},
telemetry
],
DBApps =
[mnesia_rocksdb || is_rocksdb_supported()] ++
[
mnesia,
mria,
ekka
],
BusinessApps =
[
emqx,
emqx_conf,
esasl,
observer_cli,
tools,
covertool,
% started by emqx_machine
system_monitor,
emqx_utils,
emqx_http_lib,
emqx_resource,
emqx_connector,
emqx_authn,
emqx_authz,
emqx_auto_subscribe,
emqx_gateway,
emqx_gateway_stomp,
emqx_gateway_mqttsn,
emqx_gateway_coap,
emqx_gateway_lwm2m,
emqx_gateway_exproto,
emqx_exhook,
emqx_bridge,
emqx_bridge_mqtt,
emqx_bridge_http,
emqx_rule_engine,
emqx_modules,
emqx_management,
emqx_dashboard,
emqx_retainer,
emqx_prometheus,
emqx_psk,
emqx_slow_subs,
emqx_mongodb,
emqx_redis,
emqx_mysql,
emqx_plugins
] ++
[quicer || is_quicer_supported()] ++
[bcrypt || provide_bcrypt_release(ReleaseType)] ++
%% Started automatically when needed (only needs to be started when the
%% port implementation is used)
[jq || is_jq_supported()] ++
[observer || is_app(observer)] ++
relx_apps_per_edition(Edition),
{ok, [
#{
db_apps := DBApps0,
system_apps := SystemApps,
common_business_apps := CommonBusinessApps0,
ee_business_apps := EEBusinessApps,
ce_business_apps := CEBusinessApps
}
]} = file:consult("apps/emqx_machine/priv/reboot_lists.eterm"),
DBApps = filter(DBApps0, #{mnesia_rocksdb => is_rocksdb_supported()}),
CommonBusinessApps =
filter(CommonBusinessApps0, #{
quicer => is_quicer_supported(),
bcrypt => provide_bcrypt_release(ReleaseType),
jq => is_jq_supported(),
observer => is_app(observer)
}),
EditionSpecificApps =
case Edition of
ee -> EEBusinessApps;
ce -> CEBusinessApps
end,
BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
SystemApps ++
%% EMQX starts the DB and the business applications:
[{App, load} || App <- DBApps] ++
[emqx_machine] ++
[{App, load} || App <- BusinessApps].
filter(AppList, Filters) ->
lists:filter(
fun(App) ->
AppName =
case App of
{Name, _Type} -> Name;
Name when is_atom(Name) -> Name
end,
maps:get(AppName, Filters, true)
end,
AppList
).
is_app(Name) ->
case application:load(Name) of
ok -> true;
@ -472,40 +435,6 @@ is_app(Name) ->
_ -> false
end.
relx_apps_per_edition(ee) ->
[
emqx_license,
emqx_enterprise,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,
emqx_bridge_cassandra,
emqx_bridge_opents,
emqx_bridge_clickhouse,
emqx_bridge_dynamo,
emqx_bridge_hstreamdb,
emqx_bridge_influxdb,
emqx_bridge_iotdb,
emqx_bridge_matrix,
emqx_bridge_mongodb,
emqx_bridge_mysql,
emqx_bridge_pgsql,
emqx_bridge_redis,
emqx_bridge_rocketmq,
emqx_bridge_tdengine,
emqx_bridge_timescale,
emqx_bridge_sqlserver,
emqx_oracle,
emqx_bridge_oracle,
emqx_bridge_rabbitmq,
emqx_schema_registry,
emqx_eviction_agent,
emqx_node_rebalance,
emqx_ft
];
relx_apps_per_edition(ce) ->
[emqx_telemetry].
relx_overlay(ReleaseType, Edition) ->
[
{mkdir, "log/"},