From b9b11d8f4ddf04f456a3e47eacac3ba00ddfcc00 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 19 Jul 2023 14:59:25 -0300 Subject: [PATCH] fix(machine_boot): use shared list of reboot apps and add bridges to reboot list --- Makefile | 2 +- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 6 + apps/emqx_machine/priv/reboot_lists.eterm | 110 ++++++++++++ apps/emqx_machine/src/emqx_machine_boot.erl | 94 ++++++---- .../test/emqx_schema_registry_SUITE.erl | 17 +- mix.exs | 170 +++++------------- rebar.config.erl | 143 ++++----------- 7 files changed, 267 insertions(+), 275 deletions(-) create mode 100644 apps/emqx_machine/priv/reboot_lists.eterm diff --git a/Makefile b/Makefile index c51966232..948d1d20b 100644 --- a/Makefile +++ b/Makefile @@ -295,7 +295,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt)))) .PHONY: fmt fmt: $(REBAR) - @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}' + @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,priv,test}/**/*.{erl,hrl,app.src,eterm}' @$(SCRIPTS)/erlfmt -w 'rebar.config.erl' @mix format diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 15d4b63d4..4f0f73732 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -547,6 +547,7 @@ start_cluster(Cluster) -> emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster ], + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -556,6 +557,11 @@ start_cluster(Cluster) -> Nodes ) end), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. kill_resource_managers() -> diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm new file mode 100644 index 000000000..62298697c --- /dev/null +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -0,0 +1,110 @@ +%% -*- mode: erlang; -*- +#{ + %% must always be of type `load' + db_apps => + [ + mnesia_rocksdb, + mnesia, + mria, + ekka + ], + system_apps => + [ + kernel, + sasl, + crypto, + public_key, + asn1, + syntax_tools, + ssl, + os_mon, + inets, + compiler, + runtime_tools, + redbug, + xmerl, + {hocon, load}, + telemetry + ], + %% must always be of type `load' + common_business_apps => + [ + emqx, + emqx_conf, + + esasl, + observer_cli, + tools, + covertool, + %% started by emqx_machine + system_monitor, + emqx_utils, + emqx_http_lib, + emqx_resource, + emqx_connector, + emqx_authn, + emqx_authz, + emqx_auto_subscribe, + emqx_gateway, + emqx_gateway_stomp, + emqx_gateway_mqttsn, + emqx_gateway_coap, + emqx_gateway_lwm2m, + emqx_gateway_exproto, + emqx_exhook, + emqx_bridge, + emqx_bridge_mqtt, + emqx_bridge_http, + emqx_rule_engine, + emqx_modules, + emqx_management, + emqx_dashboard, + emqx_retainer, + emqx_prometheus, + emqx_psk, + emqx_slow_subs, + emqx_mongodb, + emqx_redis, + emqx_mysql, + emqx_plugins, + quicer, + bcrypt, + jq, + observer + ], + %% must always be of type `load' + ee_business_apps => + [ + emqx_license, + emqx_enterprise, + emqx_bridge_kafka, + emqx_bridge_pulsar, + emqx_bridge_gcp_pubsub, + emqx_bridge_cassandra, + emqx_bridge_opents, + emqx_bridge_clickhouse, + emqx_bridge_dynamo, + emqx_bridge_hstreamdb, + emqx_bridge_influxdb, + emqx_bridge_iotdb, + emqx_bridge_matrix, + emqx_bridge_mongodb, + emqx_bridge_mysql, + emqx_bridge_pgsql, + emqx_bridge_redis, + emqx_bridge_rocketmq, + emqx_bridge_tdengine, + emqx_bridge_timescale, + emqx_bridge_sqlserver, + emqx_oracle, + emqx_bridge_oracle, + emqx_bridge_rabbitmq, + emqx_schema_registry, + emqx_eviction_agent, + emqx_node_rebalance, + emqx_ft + ], + %% must always be of type `load' + ce_business_apps => + [emqx_telemetry] +}. diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 361566e96..f2490ddef 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -16,6 +16,7 @@ -module(emqx_machine_boot). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([post_boot/0]). -export([stop_apps/0, ensure_apps_started/0]). @@ -24,7 +25,6 @@ -export([stop_port_apps/0]). -dialyzer({no_match, [basic_reboot_apps/0]}). --dialyzer({no_match, [basic_reboot_apps_edition/1]}). -ifdef(TEST). -export([sorted_reboot_apps/1, reboot_apps/0]). @@ -94,7 +94,8 @@ stop_one_app(App) -> ensure_apps_started() -> ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}), - lists:foreach(fun start_one_app/1, sorted_reboot_apps()). + lists:foreach(fun start_one_app/1, sorted_reboot_apps()), + ?tp(emqx_machine_boot_apps_started, #{}). start_one_app(App) -> ?SLOG(debug, #{msg => "starting_app", app => App}), @@ -128,41 +129,62 @@ reboot_apps() -> BaseRebootApps ++ ConfigApps. basic_reboot_apps() -> - ?BASIC_REBOOT_APPS ++ - [ - emqx_prometheus, - emqx_modules, - emqx_dashboard, - emqx_connector, - emqx_gateway, - emqx_resource, - emqx_rule_engine, - emqx_bridge, - emqx_management, - emqx_retainer, - emqx_exhook, - emqx_authn, - emqx_authz, - emqx_slow_subs, - emqx_auto_subscribe, - emqx_plugins, - emqx_psk - ] ++ basic_reboot_apps_edition(emqx_release:edition()). + PrivDir = code:priv_dir(emqx_machine), + RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]), + {ok, [ + #{ + common_business_apps := CommonBusinessApps0, + ee_business_apps := EEBusinessApps, + ce_business_apps := CEBusinessApps + } + ]} = file:consult(RebootListPath), + Filters0 = maps:from_list([ + {App, is_app(App)} + || App <- [quicer, bcrypt, jq, observer] + ]), + CommonBusinessApps = + filter( + CommonBusinessApps0, + %% We don't need to restart these + Filters0#{ + system_monitor => false, + observer => false, + quicer => false + } + ), + EditionSpecificApps = + case emqx_release:edition() of + ee -> EEBusinessApps; + ce -> CEBusinessApps; + _ -> [] + end, + BusinessApps = CommonBusinessApps ++ EditionSpecificApps, + ?BASIC_REBOOT_APPS ++ BusinessApps. -basic_reboot_apps_edition(ce) -> - [emqx_telemetry]; -basic_reboot_apps_edition(ee) -> - [ - emqx_license, - emqx_s3, - emqx_ft, - emqx_eviction_agent, - emqx_node_rebalance, - emqx_schema_registry - ]; -%% unexcepted edition, should not happen -basic_reboot_apps_edition(_) -> - []. +filter(AppList, Filters) -> + lists:foldr( + fun(App, Acc) -> + AppName = + case App of + {Name, _Type} -> Name; + Name when is_atom(Name) -> Name + end, + ShouldKeep = maps:get(AppName, Filters, true), + case ShouldKeep of + true -> [App | Acc]; + false -> Acc + end + end, + [], + AppList + ). + +is_app(Name) -> + case application:load(Name) of + ok -> true; + {error, {already_loaded, _}} -> true; + _ -> false + end. sorted_reboot_apps() -> Apps = [{App, app_deps(App)} || App <- reboot_apps()], diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index 322d3ab75..0db2ea3c6 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -364,11 +364,7 @@ cluster(Config) -> {load_schema, true}, {start_autocluster, true}, {schema_mod, emqx_enterprise_schema}, - %% need to restart schema registry app in the tests so - %% that it re-registers the config handler that is lost - %% when emqx_conf restarts during join. - {env, [{emqx_machine, applications, [emqx_schema_registry]}]}, - {load_apps, [emqx_machine | ?APPS]}, + {load_apps, [emqx_machine]}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), @@ -388,6 +384,7 @@ start_cluster(Cluster) -> emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster ], + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -397,7 +394,11 @@ start_cluster(Cluster) -> Nodes ) end), - erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. wait_for_cluster_rpc(Node) -> @@ -658,7 +659,7 @@ t_cluster_serde_build(Config) -> Nodes = [N1, N2 | _] = start_cluster(Cluster), NumNodes = length(Nodes), wait_for_cluster_rpc(N2), - ?assertEqual( + ?assertMatch( ok, erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema]) ), @@ -687,7 +688,7 @@ t_cluster_serde_build(Config) -> {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := schema_registry_serdes_deleted}), NumNodes, - 5_000 + 10_000 ), ?assertEqual( ok, diff --git a/mix.exs b/mix.exs index fa2b291d8..5094ed07c 100644 --- a/mix.exs +++ b/mix.exs @@ -297,6 +297,7 @@ defmodule EMQXUmbrella.MixProject do [ applications: applications(edition_type), skip_mode_validation_for: [ + :emqx_mix, :emqx_gateway, :emqx_gateway_stomp, :emqx_gateway_mqttsn, @@ -316,7 +317,10 @@ defmodule EMQXUmbrella.MixProject do :emqx_auto_subscribe, :emqx_slow_subs, :emqx_plugins, - :emqx_ft + :emqx_ft, + :emqx_s3, + :emqx_durable_storage, + :rabbit_common ], steps: steps, strip_beams: false @@ -326,137 +330,57 @@ defmodule EMQXUmbrella.MixProject do end def applications(edition_type) do - system_apps = [ - crypto: :permanent, - public_key: :permanent, - asn1: :permanent, - syntax_tools: :permanent, - ssl: :permanent, - os_mon: :permanent, - inets: :permanent, - compiler: :permanent, - runtime_tools: :permanent, - redbug: :permanent, - xmerl: :permanent, - hocon: :load, - telemetry: :permanent - ] + {:ok, + [ + %{ + db_apps: db_apps, + system_apps: system_apps, + common_business_apps: common_business_apps, + ee_business_apps: ee_business_apps, + ce_business_apps: ce_business_apps + } + ]} = :file.consult("apps/emqx_machine/priv/reboot_lists.eterm") - db_apps = - if enable_rocksdb?() do - [:mnesia_rocksdb] + db_apps = filter(db_apps, %{mnesia_rocksdb: enable_rocksdb?()}) + + common_business_apps = + filter(common_business_apps, %{ + quicer: enable_quicer?(), + bcrypt: enable_bcrypt?(), + jq: enable_jq?(), + observer: is_app?(:observer) + }) + + edition_specific_apps = + if edition_type == :enterprise do + ee_business_apps else - [] - end ++ - [ - :mnesia, - :mria, - :ekka - ] + ce_business_apps + end - business_apps = - [ - :emqx, - :emqx_conf, - :esasl, - :observer_cli, - :tools, - :covertool, - :system_monitor, - :emqx_utils, - :emqx_http_lib, - :emqx_resource, - :emqx_connector, - :emqx_authn, - :emqx_authz, - :emqx_auto_subscribe, - :emqx_gateway, - :emqx_gateway_stomp, - :emqx_gateway_mqttsn, - :emqx_gateway_coap, - :emqx_gateway_lwm2m, - :emqx_gateway_exproto, - :emqx_exhook, - :emqx_bridge, - :emqx_bridge_mqtt, - :emqx_bridge_http, - :emqx_rule_engine, - :emqx_modules, - :emqx_management, - :emqx_dashboard, - :emqx_retainer, - :emqx_prometheus, - :emqx_psk, - :emqx_slow_subs, - :emqx_mongodb, - :emqx_redis, - :emqx_mysql, - :emqx_plugins, - :emqx_mix - ] ++ - if enable_quicer?() do - [:quicer] - else - [] - end ++ - if enable_bcrypt?() do - [:bcrypt] - else - [] - end ++ - if enable_jq?() do - [:jq] - else - [] - end ++ - if(is_app(:observer), - do: [:observer], - else: [] - ) ++ - case edition_type do - :enterprise -> - [ - :emqx_license, - :emqx_enterprise, - :emqx_bridge_kafka, - :emqx_bridge_pulsar, - :emqx_bridge_gcp_pubsub, - :emqx_bridge_cassandra, - :emqx_bridge_opents, - :emqx_bridge_clickhouse, - :emqx_bridge_dynamo, - :emqx_bridge_hstreamdb, - :emqx_bridge_influxdb, - :emqx_bridge_iotdb, - :emqx_bridge_matrix, - :emqx_bridge_mongodb, - :emqx_bridge_mysql, - :emqx_bridge_pgsql, - :emqx_bridge_redis, - :emqx_bridge_rocketmq, - :emqx_bridge_tdengine, - :emqx_bridge_timescale, - :emqx_bridge_sqlserver, - :emqx_oracle, - :emqx_bridge_oracle, - :emqx_bridge_rabbitmq, - :emqx_schema_registry, - :emqx_eviction_agent, - :emqx_node_rebalance, - :emqx_ft - ] + business_apps = common_business_apps ++ edition_specific_apps - _ -> - [:emqx_telemetry] - end - - system_apps ++ + Enum.map(system_apps, fn app -> + if is_atom(app), do: {app, :permanent}, else: app + end) ++ Enum.map(db_apps, &{&1, :load}) ++ [emqx_machine: :permanent] ++ Enum.map(business_apps, &{&1, :load}) end - defp is_app(name) do + defp filter(apps, filters) do + Enum.filter(apps, fn app -> + app_name = + case app do + {app_name, _type} -> app_name + app_name when is_atom(app_name) -> app_name + end + + Map.get(filters, app_name, true) + end) + end + + defp is_app?(name) do case Application.load(name) do :ok -> true diff --git a/rebar.config.erl b/rebar.config.erl index f33d84006..a326d6fdc 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -386,85 +386,48 @@ overlay_vars_pkg(pkg) -> ]. relx_apps(ReleaseType, Edition) -> - SystemApps = - [ - kernel, - sasl, - crypto, - public_key, - asn1, - syntax_tools, - ssl, - os_mon, - inets, - compiler, - runtime_tools, - redbug, - xmerl, - {hocon, load}, - telemetry - ], - DBApps = - [mnesia_rocksdb || is_rocksdb_supported()] ++ - [ - mnesia, - mria, - ekka - ], - BusinessApps = - [ - emqx, - emqx_conf, - - esasl, - observer_cli, - tools, - covertool, - % started by emqx_machine - system_monitor, - emqx_utils, - emqx_http_lib, - emqx_resource, - emqx_connector, - emqx_authn, - emqx_authz, - emqx_auto_subscribe, - emqx_gateway, - emqx_gateway_stomp, - emqx_gateway_mqttsn, - emqx_gateway_coap, - emqx_gateway_lwm2m, - emqx_gateway_exproto, - emqx_exhook, - emqx_bridge, - emqx_bridge_mqtt, - emqx_bridge_http, - emqx_rule_engine, - emqx_modules, - emqx_management, - emqx_dashboard, - emqx_retainer, - emqx_prometheus, - emqx_psk, - emqx_slow_subs, - emqx_mongodb, - emqx_redis, - emqx_mysql, - emqx_plugins - ] ++ - [quicer || is_quicer_supported()] ++ - [bcrypt || provide_bcrypt_release(ReleaseType)] ++ - %% Started automatically when needed (only needs to be started when the - %% port implementation is used) - [jq || is_jq_supported()] ++ - [observer || is_app(observer)] ++ - relx_apps_per_edition(Edition), + {ok, [ + #{ + db_apps := DBApps0, + system_apps := SystemApps, + common_business_apps := CommonBusinessApps0, + ee_business_apps := EEBusinessApps, + ce_business_apps := CEBusinessApps + } + ]} = file:consult("apps/emqx_machine/priv/reboot_lists.eterm"), + DBApps = filter(DBApps0, #{mnesia_rocksdb => is_rocksdb_supported()}), + CommonBusinessApps = + filter(CommonBusinessApps0, #{ + quicer => is_quicer_supported(), + bcrypt => provide_bcrypt_release(ReleaseType), + jq => is_jq_supported(), + observer => is_app(observer) + }), + EditionSpecificApps = + case Edition of + ee -> EEBusinessApps; + ce -> CEBusinessApps + end, + BusinessApps = CommonBusinessApps ++ EditionSpecificApps, SystemApps ++ %% EMQX starts the DB and the business applications: [{App, load} || App <- DBApps] ++ [emqx_machine] ++ [{App, load} || App <- BusinessApps]. +filter(AppList, Filters) -> + lists:filter( + fun(App) -> + AppName = + case App of + {Name, _Type} -> Name; + Name when is_atom(Name) -> Name + end, + maps:get(AppName, Filters, true) + end, + AppList + ). + is_app(Name) -> case application:load(Name) of ok -> true; @@ -472,40 +435,6 @@ is_app(Name) -> _ -> false end. -relx_apps_per_edition(ee) -> - [ - emqx_license, - emqx_enterprise, - emqx_bridge_kafka, - emqx_bridge_pulsar, - emqx_bridge_gcp_pubsub, - emqx_bridge_cassandra, - emqx_bridge_opents, - emqx_bridge_clickhouse, - emqx_bridge_dynamo, - emqx_bridge_hstreamdb, - emqx_bridge_influxdb, - emqx_bridge_iotdb, - emqx_bridge_matrix, - emqx_bridge_mongodb, - emqx_bridge_mysql, - emqx_bridge_pgsql, - emqx_bridge_redis, - emqx_bridge_rocketmq, - emqx_bridge_tdengine, - emqx_bridge_timescale, - emqx_bridge_sqlserver, - emqx_oracle, - emqx_bridge_oracle, - emqx_bridge_rabbitmq, - emqx_schema_registry, - emqx_eviction_agent, - emqx_node_rebalance, - emqx_ft - ]; -relx_apps_per_edition(ce) -> - [emqx_telemetry]. - relx_overlay(ReleaseType, Edition) -> [ {mkdir, "log/"},