From 6cd503865bf4cbc5d0b1579cf6c7c91d646623f9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 20 Jul 2023 11:30:40 -0300 Subject: [PATCH] fix(machine_boot): ensure `emqx_bridge` starts after its companion apps We need to reverse the dependency of `emqx_bridge` and `emqx_bridge_*`, because the former loads and starts bridges during its application startup. If the individual bridge application being loaded has not started with its dependencies, the supervision tree will not be ready for that. --- .../src/emqx_bridge_cassandra.app.src | 1 - .../src/emqx_bridge_clickhouse.app.src | 1 - .../src/emqx_bridge_dynamo.app.src | 1 - .../src/emqx_bridge_gcp_pubsub.app.src | 1 - .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 6 +++++ .../src/emqx_bridge_hstreamdb.app.src | 1 - .../src/emqx_bridge_http.app.src | 2 +- .../src/emqx_bridge_influxdb.app.src | 1 - .../src/emqx_bridge_iotdb.app.src | 1 - .../src/emqx_bridge_kafka.app.src | 1 - .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 11 ++++++++-- .../src/emqx_bridge_matrix.app.src | 3 +-- .../src/emqx_bridge_mongodb.app.src | 1 - .../src/emqx_bridge_mqtt.app.src | 3 +-- .../src/emqx_bridge_mysql.app.src | 1 - .../src/emqx_bridge_opents.app.src | 1 - .../src/emqx_bridge_oracle.app.src | 1 - .../src/emqx_bridge_pgsql.app.src | 3 +-- .../src/emqx_bridge_pulsar.app.src | 1 - .../src/emqx_bridge_rabbitmq.app.src | 2 -- .../src/emqx_bridge_redis.app.src | 1 - .../src/emqx_bridge_rocketmq.app.src | 2 +- .../src/emqx_bridge_sqlserver.app.src | 2 +- .../src/emqx_bridge_tdengine.app.src | 1 - .../src/emqx_bridge_timescale.app.src | 2 +- apps/emqx_machine/src/emqx_machine_boot.erl | 22 ++++++++++++++++++- 26 files changed, 43 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index f449588cc..de790ab46 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, ecql ]}, {env, []}, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index cfb08f47b..a10361c51 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, clickhouse ]}, {env, []}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 824f5ee7b..ed5078432 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, erlcloud ]}, {env, []}, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 10722e6ce..53d36361c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge_http, ehttpc ]}, {env, []}, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 2a04b5ee1..80ffdfe1a 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -598,6 +598,7 @@ start_cluster(Cluster) -> end, Cluster ), + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -607,6 +608,11 @@ start_cluster(Cluster) -> Nodes ) end), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. wait_for_cluster_rpc(Node) -> diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index 0549dd020..2a800baca 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, hstreamdb_erl ]}, {env, []}, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index f8097dec2..859f80f53 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -2,7 +2,7 @@ {description, "EMQX HTTP Bridge and Connector Application"}, {vsn, "0.1.1"}, {registered, []}, - {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]}, + {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 71b95a40d..a612c225b 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, influxdb ]}, {env, []}, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index d26cbe873..b79c4c2ce 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -11,7 +11,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge_http, %% for module emqx_connector_http emqx_connector ]}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 87c1841e5..997b768b3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -7,7 +7,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, telemetry, wolff, brod, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 74fde6426..f1f2ce362 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1071,13 +1071,14 @@ cluster(Config) -> Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ - {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]}, + {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]}, {listener_ports, []}, {peer_mod, PeerModule}, {priv_data_dir, PrivDataDir}, {load_schema, true}, {start_autocluster, true}, {schema_mod, emqx_enterprise_schema}, + {load_apps, [emqx_machine]}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), @@ -1901,6 +1902,7 @@ t_cluster_node_down(Config) -> ?check_trace( begin {_N2, Opts2} = lists:nth(2, Cluster), + NumNodes = length(Cluster), Nodes = [N1, N2 | _] = lists:map( @@ -1925,6 +1927,11 @@ t_cluster_node_down(Config) -> 15_000 ), wait_for_cluster_rpc(N2), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), {ok, _} = snabbkaffe:receive_events(SRef0), lists:foreach( @@ -1980,7 +1987,7 @@ t_cluster_node_down(Config) -> ?assertEqual(NPartitions, map_size(Assignments)), NumPublished = ets:info(TId, size), %% All published messages are eventually received. - Published = receive_published(#{n => NumPublished, timeout => 3_000}), + Published = receive_published(#{n => NumPublished, timeout => 10_000}), ct:pal("published:\n ~p", [Published]), ok end diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 42129bfc7..14aca1f75 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -5,8 +5,7 @@ {applications, [ kernel, stdlib, - emqx_resource, - emqx_bridge + emqx_resource ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index fa3ebd3c9..35bcc3fc4 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_mongodb ]}, {env, []}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 853e98eb5..052b271f8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,14 +1,13 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, stdlib, emqx, emqx_resource, - emqx_bridge, emqtt ]}, {env, []}, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 2ecdd6a6a..252b8ff00 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_mysql ]}, {env, []}, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 6ec938afd..b106bb844 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, opentsdb ]}, {env, []}, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index a05533da3..4f46ce464 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, emqx_oracle ]}, {env, []}, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index ade791a6d..85131baf0 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -5,8 +5,7 @@ {applications, [ kernel, stdlib, - emqx_resource, - emqx_bridge + emqx_resource ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index 99fb25c33..ed468a833 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, pulsar ]}, {env, []}, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index e9ef4d524..c6c0c3897 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -6,8 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, - ecql, rabbit_common, amqp_client ]}, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index bc21adcad..b380bc86d 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_redis ]}, {env, []}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index e18b98e3a..e158a2e46 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise RocketMQ Bridge"}, {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]}, + {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 35f4587b0..3aa8b3b68 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise SQL Server Bridge"}, {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]}, + {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index e4c946162..be57fa880 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, tdengine ]}, {env, []}, diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index 7a4aeeb56..adb024591 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise TimescaleDB Bridge"}, {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge]}, + {applications, [kernel, stdlib, emqx_resource]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index cdb2c24e8..82b909b4f 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -160,7 +160,8 @@ is_app(Name) -> end. sorted_reboot_apps() -> - Apps = [{App, app_deps(App)} || App <- reboot_apps()], + Apps0 = [{App, app_deps(App)} || App <- reboot_apps()], + Apps = inject_bridge_deps(Apps0), sorted_reboot_apps(Apps). app_deps(App) -> @@ -169,6 +170,25 @@ app_deps(App) -> {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) end. +%% `emqx_bridge' is special in that it needs all the bridges apps to +%% be started before it, so that, when it loads the bridges from +%% configuration, the bridge app and its dependencies need to be up. +inject_bridge_deps(RebootAppDeps) -> + BridgeApps = [ + App + || {App, _Deps} <- RebootAppDeps, + lists:prefix("emqx_bridge_", atom_to_list(App)) + ], + lists:map( + fun + ({emqx_bridge, Deps0}) when is_list(Deps0) -> + {emqx_bridge, Deps0 ++ BridgeApps}; + (App) -> + App + end, + RebootAppDeps + ). + sorted_reboot_apps(Apps) -> G = digraph:new(), try