fix: prevent use pd

This commit is contained in:
JimMoen 2024-06-19 16:01:09 +08:00
parent b7b2a08399
commit a6fa3e82d9
No known key found for this signature in database
3 changed files with 45 additions and 30 deletions

View File

@ -564,8 +564,7 @@ install_package(FileName, Bin) ->
ok = filelib:ensure_dir(File), ok = filelib:ensure_dir(File),
ok = file:write_file(File, Bin), ok = file:write_file(File, Bin),
PackageName = string:trim(FileName, trailing, ".tar.gz"), PackageName = string:trim(FileName, trailing, ".tar.gz"),
put(?fresh_install, true), case emqx_plugins:ensure_installed(PackageName, ?fresh_install) of
case emqx_plugins:ensure_installed(PackageName) of
{error, #{reason := plugin_not_found}} = NotFound -> {error, #{reason := plugin_not_found}} = NotFound ->
NotFound; NotFound;
{error, Reason} = Error -> {error, Reason} = Error ->

View File

@ -27,6 +27,7 @@
-define(plugin_conf_not_found, plugin_conf_not_found). -define(plugin_conf_not_found, plugin_conf_not_found).
-define(plugin_without_config_schema, plugin_without_config_schema). -define(plugin_without_config_schema, plugin_without_config_schema).
-define(fresh_install, fresh_install). -define(fresh_install, fresh_install).
-define(normal, normal).
-type schema_name() :: binary(). -type schema_name() :: binary().
-type avsc_path() :: string(). -type avsc_path() :: string().

View File

@ -39,6 +39,7 @@
-export([ -export([
ensure_installed/0, ensure_installed/0,
ensure_installed/1, ensure_installed/1,
ensure_installed/2,
ensure_uninstalled/1, ensure_uninstalled/1,
ensure_enabled/1, ensure_enabled/1,
ensure_enabled/2, ensure_enabled/2,
@ -169,19 +170,27 @@ ensure_installed(NameVsn) ->
case read_plugin_info(NameVsn, #{}) of case read_plugin_info(NameVsn, #{}) of
{ok, _} -> {ok, _} ->
ok, ok,
_ = maybe_ensure_plugin_config(NameVsn); _ = maybe_ensure_plugin_config(NameVsn, ?normal);
{error, _} -> {error, _} ->
ok = purge(NameVsn), ok = purge(NameVsn),
case ensure_exists_and_installed(NameVsn) of case ensure_exists_and_installed(NameVsn) of
ok -> ok ->
maybe_post_op_after_installed(NameVsn), maybe_post_op_after_installed(NameVsn, ?normal),
_ = maybe_ensure_plugin_config(NameVsn),
ok; ok;
{error, _Reason} = Err -> {error, _Reason} = Err ->
Err Err
end end
end. end.
ensure_installed(NameVsn, ?fresh_install = Mode) ->
case ensure_exists_and_installed(NameVsn) of
ok ->
maybe_post_op_after_installed(NameVsn, Mode),
ok;
{error, _Reason} = Err ->
Err
end.
%% @doc Ensure files and directories for the given plugin are being deleted. %% @doc Ensure files and directories for the given plugin are being deleted.
%% If a plugin is running, or enabled, an error is returned. %% If a plugin is running, or enabled, an error is returned.
-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
@ -791,6 +800,11 @@ get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) -> get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
{ok, _} = Res -> {ok, _} = Res ->
?SLOG(debug, #{
msg => "get_plugin_tar_from_cluster_successfully",
node => Node,
name_vsn => NameVsn
}),
Res; Res;
Err -> Err ->
get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors]) get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
@ -805,6 +819,11 @@ get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
) )
of of
{ok, _} = Res -> {ok, _} = Res ->
?SLOG(debug, #{
msg => "get_plugin_config_from_cluster_successfully",
node => Node,
name_vsn => NameVsn
}),
Res; Res;
Err -> Err ->
get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
@ -1156,9 +1175,9 @@ for_plugins(ActionFun) ->
ok ok
end. end.
maybe_post_op_after_installed(NameVsn0) -> maybe_post_op_after_installed(NameVsn0, Mode) ->
NameVsn = wrap_to_list(NameVsn0), NameVsn = wrap_to_list(NameVsn0),
_ = maybe_load_config_schema(NameVsn), _ = maybe_load_config_schema(NameVsn, Mode),
ok = maybe_ensure_state(NameVsn). ok = maybe_ensure_state(NameVsn).
maybe_ensure_state(NameVsn) -> maybe_ensure_state(NameVsn) ->
@ -1183,13 +1202,13 @@ maybe_ensure_state(NameVsn) ->
end, end,
ok. ok.
maybe_load_config_schema(NameVsn) -> maybe_load_config_schema(NameVsn, Mode) ->
AvscPath = avsc_file_path(NameVsn), AvscPath = avsc_file_path(NameVsn),
_ = _ =
with_plugin_avsc(NameVsn) andalso with_plugin_avsc(NameVsn) andalso
filelib:is_regular(AvscPath) andalso filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath), do_load_config_schema(NameVsn, AvscPath),
_ = maybe_create_config_dir(NameVsn). _ = maybe_create_config_dir(NameVsn, Mode).
do_load_config_schema(NameVsn, AvscPath) -> do_load_config_schema(NameVsn, AvscPath) ->
case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
@ -1198,11 +1217,11 @@ do_load_config_schema(NameVsn, AvscPath) ->
{error, _Reason} -> ok {error, _Reason} -> ok
end. end.
maybe_create_config_dir(NameVsn) -> maybe_create_config_dir(NameVsn, Mode) ->
with_plugin_avsc(NameVsn) andalso with_plugin_avsc(NameVsn) andalso
do_create_config_dir(NameVsn). do_create_config_dir(NameVsn, Mode).
do_create_config_dir(NameVsn) -> do_create_config_dir(NameVsn, Mode) ->
case plugin_config_dir(NameVsn) of case plugin_config_dir(NameVsn) of
{error, Reason} -> {error, Reason} ->
{error, {gen_config_dir_failed, Reason}}; {error, {gen_config_dir_failed, Reason}};
@ -1210,7 +1229,7 @@ do_create_config_dir(NameVsn) ->
case filelib:ensure_path(ConfigDir) of case filelib:ensure_path(ConfigDir) of
ok -> ok ->
%% get config from other nodes or get from tarball %% get config from other nodes or get from tarball
_ = maybe_ensure_plugin_config(NameVsn), _ = maybe_ensure_plugin_config(NameVsn, Mode),
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{
@ -1222,29 +1241,25 @@ do_create_config_dir(NameVsn) ->
end end
end. end.
-spec maybe_ensure_plugin_config(name_vsn()) -> ok. -spec maybe_ensure_plugin_config(name_vsn(), ?fresh_install | ?normal) -> ok.
maybe_ensure_plugin_config(NameVsn) -> maybe_ensure_plugin_config(NameVsn, Mode) ->
maybe maybe
true ?= with_plugin_avsc(NameVsn), true ?= with_plugin_avsc(NameVsn),
_ = ensure_plugin_config(NameVsn) _ = ensure_plugin_config({NameVsn, Mode})
else else
_ -> ok _ -> ok
end. end.
-spec ensure_plugin_config(name_vsn()) -> ok. -spec ensure_plugin_config({name_vsn(), ?fresh_install | ?normal}) -> ok.
ensure_plugin_config(NameVsn) -> ensure_plugin_config({NameVsn, ?normal}) ->
case get(?fresh_install) of ensure_plugin_config(NameVsn, [N || N <- mria:running_nodes(), N /= node()]);
true -> ensure_plugin_config({NameVsn, ?fresh_install}) ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "default_plugin_config_used", msg => "default_plugin_config_used",
name_vsn => NameVsn, name_vsn => NameVsn,
reason => "fresh_install" hint => "fresh_install"
}), }),
cp_default_config_file(NameVsn); cp_default_config_file(NameVsn).
_ ->
%% fetch plugin hocon config from cluster
ensure_plugin_config(NameVsn, [N || N <- mria:running_nodes(), N /= node()])
end.
-spec ensure_plugin_config(name_vsn(), list()) -> ok. -spec ensure_plugin_config(name_vsn(), list()) -> ok.
ensure_plugin_config(NameVsn, []) -> ensure_plugin_config(NameVsn, []) ->