fix(machine_boot): ensure `emqx_bridge` starts after its companion apps

We need to reverse the dependency of `emqx_bridge` and `emqx_bridge_*`, because the former
loads and starts bridges during its application startup.  If the individual bridge
application being loaded has not started with its dependencies, the supervision tree will
not be ready for that.
This commit is contained in:
Thales Macedo Garitezi 2023-07-20 11:30:40 -03:00
parent 77e2d852e5
commit 6cd503865b
26 changed files with 43 additions and 30 deletions

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
ecql ecql
]}, ]},
{env, []}, {env, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
clickhouse clickhouse
]}, ]},
{env, []}, {env, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
erlcloud erlcloud
]}, ]},
{env, []}, {env, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge_http,
ehttpc ehttpc
]}, ]},
{env, []}, {env, []},

View File

@ -598,6 +598,7 @@ start_cluster(Cluster) ->
end, end,
Cluster Cluster
), ),
NumNodes = length(Nodes),
on_exit(fun() -> on_exit(fun() ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
@ -607,6 +608,11 @@ start_cluster(Cluster) ->
Nodes Nodes
) )
end), end),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
),
Nodes. Nodes.
wait_for_cluster_rpc(Node) -> wait_for_cluster_rpc(Node) ->

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
hstreamdb_erl hstreamdb_erl
]}, ]},
{env, []}, {env, []},

View File

@ -2,7 +2,7 @@
{description, "EMQX HTTP Bridge and Connector Application"}, {description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.1.1"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{links, []} {links, []}

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
influxdb influxdb
]}, ]},
{env, []}, {env, []},

View File

@ -11,7 +11,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge_http,
%% for module emqx_connector_http %% for module emqx_connector_http
emqx_connector emqx_connector
]}, ]},

View File

@ -7,7 +7,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
telemetry, telemetry,
wolff, wolff,
brod, brod,

View File

@ -1071,13 +1071,14 @@ cluster(Config) ->
Cluster = emqx_common_test_helpers:emqx_cluster( Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core], [core, core],
[ [
{apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]}, {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]},
{listener_ports, []}, {listener_ports, []},
{peer_mod, PeerModule}, {peer_mod, PeerModule},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{load_schema, true}, {load_schema, true},
{start_autocluster, true}, {start_autocluster, true},
{schema_mod, emqx_enterprise_schema}, {schema_mod, emqx_enterprise_schema},
{load_apps, [emqx_machine]},
{env_handler, fun {env_handler, fun
(emqx) -> (emqx) ->
application:set_env(emqx, boot_modules, [broker, router]), application:set_env(emqx, boot_modules, [broker, router]),
@ -1901,6 +1902,7 @@ t_cluster_node_down(Config) ->
?check_trace( ?check_trace(
begin begin
{_N2, Opts2} = lists:nth(2, Cluster), {_N2, Opts2} = lists:nth(2, Cluster),
NumNodes = length(Cluster),
Nodes = Nodes =
[N1, N2 | _] = [N1, N2 | _] =
lists:map( lists:map(
@ -1925,6 +1927,11 @@ t_cluster_node_down(Config) ->
15_000 15_000
), ),
wait_for_cluster_rpc(N2), wait_for_cluster_rpc(N2),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
),
erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
lists:foreach( lists:foreach(
@ -1980,7 +1987,7 @@ t_cluster_node_down(Config) ->
?assertEqual(NPartitions, map_size(Assignments)), ?assertEqual(NPartitions, map_size(Assignments)),
NumPublished = ets:info(TId, size), NumPublished = ets:info(TId, size),
%% All published messages are eventually received. %% All published messages are eventually received.
Published = receive_published(#{n => NumPublished, timeout => 3_000}), Published = receive_published(#{n => NumPublished, timeout => 10_000}),
ct:pal("published:\n ~p", [Published]), ct:pal("published:\n ~p", [Published]),
ok ok
end end

View File

@ -5,8 +5,7 @@
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource
emqx_bridge
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -7,7 +7,6 @@
stdlib, stdlib,
emqx_connector, emqx_connector,
emqx_resource, emqx_resource,
emqx_bridge,
emqx_mongodb emqx_mongodb
]}, ]},
{env, []}, {env, []},

View File

@ -1,14 +1,13 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
emqx, emqx,
emqx_resource, emqx_resource,
emqx_bridge,
emqtt emqtt
]}, ]},
{env, []}, {env, []},

View File

@ -7,7 +7,6 @@
stdlib, stdlib,
emqx_connector, emqx_connector,
emqx_resource, emqx_resource,
emqx_bridge,
emqx_mysql emqx_mysql
]}, ]},
{env, []}, {env, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
opentsdb opentsdb
]}, ]},
{env, []}, {env, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
emqx_oracle emqx_oracle
]}, ]},
{env, []}, {env, []},

View File

@ -5,8 +5,7 @@
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource
emqx_bridge
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
pulsar pulsar
]}, ]},
{env, []}, {env, []},

View File

@ -6,8 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
ecql,
rabbit_common, rabbit_common,
amqp_client amqp_client
]}, ]},

View File

@ -7,7 +7,6 @@
stdlib, stdlib,
emqx_connector, emqx_connector,
emqx_resource, emqx_resource,
emqx_bridge,
emqx_redis emqx_redis
]}, ]},
{env, []}, {env, []},

View File

@ -2,7 +2,7 @@
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{links, []} {links, []}

View File

@ -2,7 +2,7 @@
{description, "EMQX Enterprise SQL Server Bridge"}, {description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]}, {applications, [kernel, stdlib, emqx_resource, odbc]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{links, []} {links, []}

View File

@ -6,7 +6,6 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge,
tdengine tdengine
]}, ]},
{env, []}, {env, []},

View File

@ -2,7 +2,7 @@
{description, "EMQX Enterprise TimescaleDB Bridge"}, {description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, emqx_bridge]}, {applications, [kernel, stdlib, emqx_resource]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{links, []} {links, []}

View File

@ -160,7 +160,8 @@ is_app(Name) ->
end. end.
sorted_reboot_apps() -> sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()], Apps0 = [{App, app_deps(App)} || App <- reboot_apps()],
Apps = inject_bridge_deps(Apps0),
sorted_reboot_apps(Apps). sorted_reboot_apps(Apps).
app_deps(App) -> app_deps(App) ->
@ -169,6 +170,25 @@ app_deps(App) ->
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end. end.
%% `emqx_bridge' is special in that it needs all the bridges apps to
%% be started before it, so that, when it loads the bridges from
%% configuration, the bridge app and its dependencies need to be up.
inject_bridge_deps(RebootAppDeps) ->
BridgeApps = [
App
|| {App, _Deps} <- RebootAppDeps,
lists:prefix("emqx_bridge_", atom_to_list(App))
],
lists:map(
fun
({emqx_bridge, Deps0}) when is_list(Deps0) ->
{emqx_bridge, Deps0 ++ BridgeApps};
(App) ->
App
end,
RebootAppDeps
).
sorted_reboot_apps(Apps) -> sorted_reboot_apps(Apps) ->
G = digraph:new(), G = digraph:new(),
try try