fix(plugins): ensure plugin apps are restarted when restarting `emqx_plugins`

Fixes https://emqx.atlassian.net/browse/EMQX-12628
Fixes https://github.com/emqx/emqx/issues/13378
This commit is contained in:
Thales Macedo Garitezi 2024-07-02 10:36:00 -03:00
parent dea2bf19b1
commit e1420a27bb
7 changed files with 163 additions and 7 deletions

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"}, {id, "emqx_machine"},
{description, "The EMQX Machine"}, {description, "The EMQX Machine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "0.3.2"}, {vsn, "0.3.3"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_ctl, redbug]}, {applications, [kernel, stdlib, emqx_ctl, redbug]},

View File

@ -66,6 +66,7 @@ stop_apps() ->
?SLOG(notice, #{msg => "stopping_emqx_apps"}), ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(), _ = emqx_alarm_handler:unload(),
ok = emqx_conf_app:unset_config_loaded(), ok = emqx_conf_app:unset_config_loaded(),
ok = emqx_plugins:ensure_stopped(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
%% Those port apps are terminated after the main apps %% Those port apps are terminated after the main apps

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugins, [ {application, emqx_plugins, [
{description, "EMQX Plugin Management"}, {description, "EMQX Plugin Management"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{modules, []}, {modules, []},
{mod, {emqx_plugins_app, []}}, {mod, {emqx_plugins_app, []}},
{applications, [kernel, stdlib, emqx, erlavro]}, {applications, [kernel, stdlib, emqx, erlavro]},

View File

@ -299,8 +299,10 @@ ensure_stopped() ->
Fun = fun Fun = fun
(#{name_vsn := NameVsn, enable := true}) -> (#{name_vsn := NameVsn, enable := true}) ->
case ensure_stopped(NameVsn) of case ensure_stopped(NameVsn) of
ok -> []; ok ->
{error, Reason} -> [{NameVsn, Reason}] [];
{error, Reason} ->
[{NameVsn, Reason}]
end; end;
(#{name_vsn := NameVsn, enable := false}) -> (#{name_vsn := NameVsn, enable := false}) ->
?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}), ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
@ -1077,15 +1079,15 @@ stop_app(App) ->
case application:stop(App) of case application:stop(App) of
ok -> ok ->
?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}), ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
ok = unload_moudle_and_app(App); ok = unload_module_and_app(App);
{error, {not_started, App}} -> {error, {not_started, App}} ->
?SLOG(debug, #{msg => "plugin_not_started", app => App}), ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
ok = unload_moudle_and_app(App); ok = unload_module_and_app(App);
{error, Reason} -> {error, Reason} ->
throw(#{msg => "failed_to_stop_app", app => App, reason => Reason}) throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
end. end.
unload_moudle_and_app(App) -> unload_module_and_app(App) ->
case application:get_key(App, modules) of case application:get_key(App, modules) of
{ok, Modules} -> {ok, Modules} ->
lists:foreach(fun code:soft_purge/1, Modules); lists:foreach(fun code:soft_purge/1, Modules);

View File

@ -19,6 +19,7 @@
-behaviour(application). -behaviour(application).
-include("emqx_plugins.hrl"). -include("emqx_plugins.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([ -export([
start/2, start/2,
@ -31,6 +32,7 @@ start(_Type, _Args) ->
ok = emqx_plugins:ensure_installed(), ok = emqx_plugins:ensure_installed(),
ok = emqx_plugins:ensure_started(), ok = emqx_plugins:ensure_started(),
ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins), ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins),
?tp("emqx_plugins_app_started", #{}),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -48,6 +48,8 @@
-define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2"). -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2").
-define(PACKAGE_SUFFIX, ".tar.gz"). -define(PACKAGE_SUFFIX, ".tar.gz").
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
all() -> all() ->
[ [
{group, copy_plugin}, {group, copy_plugin},
@ -140,6 +142,39 @@ bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
bin(B) when is_binary(B) -> B. bin(B) when is_binary(B) -> B.
hookpoints() ->
[
'client.connect',
'client.connack',
'client.connected',
'client.disconnected',
'client.authenticate',
'client.authorize',
'client.subscribe',
'client.unsubscribe',
'session.created',
'session.subscribed',
'session.unsubscribed',
'session.resumed',
'session.discarded',
'session.takenover',
'session.terminated',
'message.publish',
'message.puback',
'message.delivered',
'message.acked',
'message.dropped'
].
get_hook_modules() ->
lists:flatmap(
fun(HookPoint) ->
CBs = emqx_hooks:lookup(HookPoint),
[Mod || {callback, {Mod, _Fn, _Args}, _Filter, _Prio} <- CBs]
end,
hookpoints()
).
t_demo_install_start_stop_uninstall({init, Config}) -> t_demo_install_start_stop_uninstall({init, Config}) ->
Opts = #{package := Package} = get_demo_plugin_package(), Opts = #{package := Package} = get_demo_plugin_package(),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
@ -256,9 +291,18 @@ t_start_restart_and_stop({init, Config}) ->
t_start_restart_and_stop({'end', _Config}) -> t_start_restart_and_stop({'end', _Config}) ->
ok; ok;
t_start_restart_and_stop(Config) -> t_start_restart_and_stop(Config) ->
%% pre-condition
Hooks0 = get_hook_modules(),
?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks0), #{hooks => Hooks0}),
NameVsn = proplists:get_value(name_vsn, Config), NameVsn = proplists:get_value(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn), ok = emqx_plugins:ensure_installed(NameVsn),
ok = emqx_plugins:ensure_enabled(NameVsn), ok = emqx_plugins:ensure_enabled(NameVsn),
%% Application is not yet started.
Hooks1 = get_hook_modules(),
?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks1), #{hooks => Hooks1}),
FakeInfo = FakeInfo =
"name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"]," "name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"],"
"description=\"desc bar\"", "description=\"desc bar\"",
@ -271,6 +315,10 @@ t_start_restart_and_stop(Config) ->
ok = emqx_plugins:ensure_started(), ok = emqx_plugins:ensure_started(),
assert_app_running(?EMQX_PLUGIN_APP_NAME, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
%% Should have called the application start callback, which in turn adds hooks.
Hooks2 = get_hook_modules(),
?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks2), #{hooks => Hooks2}),
%% fake enable bar-2 %% fake enable bar-2
ok = ensure_state(Bar2, rear, true), ok = ensure_state(Bar2, rear, true),
%% should cause an error %% should cause an error
@ -292,6 +340,10 @@ t_start_restart_and_stop(Config) ->
assert_app_running(?EMQX_PLUGIN_APP_NAME, false), assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
ok = ensure_state(Bar2, rear, false), ok = ensure_state(Bar2, rear, false),
%% Should have called the application stop callback, which removes the hooks.
Hooks3 = get_hook_modules(),
?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks3), #{hooks => Hooks3}),
ok = emqx_plugins:restart(NameVsn), ok = emqx_plugins:restart(NameVsn),
assert_app_running(?EMQX_PLUGIN_APP_NAME, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
%% repeat %% repeat
@ -371,6 +423,15 @@ assert_app_running(Name, false) ->
AllApps = application:which_applications(), AllApps = application:which_applications(),
?assertEqual(false, lists:keyfind(Name, 1, AllApps)). ?assertEqual(false, lists:keyfind(Name, 1, AllApps)).
assert_started_and_hooks_loaded() ->
PluginConfig = emqx_plugins:list(),
ct:pal("plugin config:\n ~p", [PluginConfig]),
?assertMatch([_], PluginConfig),
assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
Hooks = get_hook_modules(),
?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks), #{hooks => Hooks}),
ok.
t_bad_tar_gz({init, Config}) -> t_bad_tar_gz({init, Config}) ->
Config; Config;
t_bad_tar_gz({'end', _Config}) -> t_bad_tar_gz({'end', _Config}) ->
@ -841,6 +902,95 @@ group_t_cluster_leave(Config) ->
), ),
ok. ok.
%% Checks that starting a node with a plugin enabled starts it correctly, and that the
%% hooks added by the plugin's `application:start/2' callback are indeed in place.
%% See also: https://github.com/emqx/emqx/issues/13378
t_start_node_with_plugin_enabled({init, Config}) ->
#{package := Package, shdir := InstallDir} = get_demo_plugin_package(),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
AppSpecs = [
emqx,
emqx_conf,
emqx_ctl,
{emqx_plugins, #{
config =>
#{
plugins =>
#{
install_dir => InstallDir,
states =>
[
#{
enable => true,
name_vsn => NameVsn
}
]
}
}
}}
],
Name1 = t_cluster_start_enabled1,
Name2 = t_cluster_start_enabled2,
Specs = emqx_cth_cluster:mk_nodespecs(
[
{Name1, #{role => core, apps => AppSpecs, join_to => undefined}},
{Name2, #{role => core, apps => AppSpecs, join_to => undefined}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Names = [Name1, Name2],
Nodes = [emqx_cth_cluster:node_name(N) || N <- Names],
[
{node_specs, Specs},
{nodes, Nodes},
{name_vsn, NameVsn}
| Config
];
t_start_node_with_plugin_enabled({'end', Config}) ->
Nodes = ?config(nodes, Config),
ok = emqx_cth_cluster:stop(Nodes),
ok;
t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
NodeSpecs = ?config(node_specs, Config),
?check_trace(
#{timetrap => 10_000},
begin
[N1, N2 | _] = emqx_cth_cluster:start(NodeSpecs),
?ON(N1, assert_started_and_hooks_loaded()),
?ON(N2, assert_started_and_hooks_loaded()),
%% Now make them join.
%% N.B.: We need to start autocluster so that applications are restarted in
%% order, and also we need to override the config loader to emulate what
%% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
%% keys.
ok = ?ON(N2, emqx_machine_boot:start_autocluster()),
?ON(N2, begin
StartCallback0 =
case ekka:env({callback, start}) of
{ok, SC0} -> SC0;
_ -> fun() -> ok end
end,
StartCallback = fun() ->
ok = emqx_app:set_config_loader(emqx_cth_suite),
StartCallback0()
end,
ekka:callback(start, StartCallback)
end),
{ok, {ok, _}} =
?wait_async_action(
?ON(N2, ekka:join(N1)),
#{?snk_kind := "emqx_plugins_app_started"}
),
ct:pal("checking N1 state"),
?ON(N1, assert_started_and_hooks_loaded()),
ct:pal("checking N2 state"),
?ON(N2, assert_started_and_hooks_loaded()),
ok
end,
[]
),
ok.
make_tar(Cwd, NameWithVsn) -> make_tar(Cwd, NameWithVsn) ->
make_tar(Cwd, NameWithVsn, NameWithVsn). make_tar(Cwd, NameWithVsn, NameWithVsn).

View File

@ -0,0 +1 @@
Fixed an issue where plugin applications were not restarted after a node joins a cluster, leading to an inconsistent state where hooks were not properly installed.