Merge pull request #13269 from JimMoen/improve-plugin-behavior

fix: mark fresh install to cp the default configuration file directly
This commit is contained in:
zmstone 2024-06-24 09:17:25 +02:00 committed by GitHub
commit f9e17d6c25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 110 additions and 70 deletions

View File

@ -59,7 +59,6 @@
-define(VSN_WILDCARD, "-*.tar.gz").
-define(CONTENT_PLUGIN, plugin).
-define(CONTENT_CONFIG, config).
namespace() ->
"plugins".
@ -411,7 +410,7 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
NameVsn = string:trim(FileName, trailing, ".tar.gz"),
case emqx_plugins:describe(NameVsn) of
{error, #{error_msg := "bad_info_file", reason := {enoent, _}}} ->
{error, #{msg := "bad_info_file", reason := {enoent, _Path}}} ->
case emqx_plugins:parse_name_vsn(FileName) of
{ok, AppName, _Vsn} ->
AppDir = filename:join(emqx_plugins:install_dir(), AppName),
@ -467,12 +466,12 @@ do_install_package(FileName, Bin) ->
),
Reason =
case hd(Filtered) of
{error, #{error_msg := Reason0}} -> Reason0;
{error, #{msg := Reason0}} -> Reason0;
{error, #{reason := Reason0}} -> Reason0
end,
{400, #{
code => 'BAD_PLUGIN_INFO',
message => iolist_to_binary([Reason, ": ", FileName])
message => iolist_to_binary([bin(Reason), ": ", FileName])
}}
end.
@ -565,8 +564,8 @@ install_package(FileName, Bin) ->
ok = filelib:ensure_dir(File),
ok = file:write_file(File, Bin),
PackageName = string:trim(FileName, trailing, ".tar.gz"),
case emqx_plugins:ensure_installed(PackageName) of
{error, #{reason := not_found}} = NotFound ->
case emqx_plugins:ensure_installed(PackageName, ?fresh_install) of
{error, #{reason := plugin_not_found}} = NotFound ->
NotFound;
{error, Reason} = Error ->
?SLOG(error, Reason#{msg => "failed_to_install_plugin"}),
@ -597,6 +596,9 @@ delete_package(Name) ->
end.
%% for RPC plugin update
%% TODO: catch thrown error to return 400
%% - plugin_not_found
%% - otp vsn assertion failed
ensure_action(Name, start) ->
_ = emqx_plugins:ensure_started(Name),
_ = emqx_plugins:ensure_enabled(Name),
@ -625,10 +627,9 @@ do_update_plugin_config(NameVsn, AvroJsonMap, AvroValue) ->
return(Code, ok) ->
{Code};
return(_, {error, #{error_msg := "bad_info_file", reason := {enoent, _} = Reason}}) ->
{404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = Reason}}) ->
{404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
return(_, {error, #{msg := Msg, reason := {enoent, Path} = Reason}}) ->
?SLOG(error, #{msg => Msg, reason => Reason}),
{404, #{code => 'NOT_FOUND', message => iolist_to_binary([Path, " does not exist"])}};
return(_, {error, Reason}) ->
{400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}.
@ -728,6 +729,10 @@ format_plugin_avsc_and_i18n(_NameVsn) ->
#{avsc => null, i18n => null}.
-endif.
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> list_to_binary(L);
bin(B) when is_binary(B) -> B.
% running_status: running loaded, stopped
%% config_status: not_configured disable enable
plugin_status(#{running_status := running}) -> running;

View File

@ -26,6 +26,8 @@
-define(plugin_conf_not_found, plugin_conf_not_found).
-define(plugin_without_config_schema, plugin_without_config_schema).
-define(fresh_install, fresh_install).
-define(normal, normal).
-type schema_name() :: binary().
-type avsc_path() :: string().

View File

@ -39,6 +39,7 @@
-export([
ensure_installed/0,
ensure_installed/1,
ensure_installed/2,
ensure_uninstalled/1,
ensure_enabled/1,
ensure_enabled/2,
@ -169,19 +170,27 @@ ensure_installed(NameVsn) ->
case read_plugin_info(NameVsn, #{}) of
{ok, _} ->
ok,
_ = maybe_ensure_plugin_config(NameVsn);
_ = maybe_ensure_plugin_config(NameVsn, ?normal);
{error, _} ->
ok = purge(NameVsn),
case ensure_exists_and_installed(NameVsn) of
ok ->
maybe_post_op_after_installed(NameVsn),
_ = maybe_ensure_plugin_config(NameVsn),
maybe_post_op_after_installed(NameVsn, ?normal),
ok;
{error, _Reason} = Err ->
Err
end
end.
ensure_installed(NameVsn, ?fresh_install = Mode) ->
case ensure_exists_and_installed(NameVsn) of
ok ->
maybe_post_op_after_installed(NameVsn, Mode),
ok;
{error, _Reason} = Err ->
Err
end.
%% @doc Ensure files and directories for the given plugin are being deleted.
%% If a plugin is running, or enabled, an error is returned.
-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
@ -189,12 +198,12 @@ ensure_uninstalled(NameVsn) ->
case read_plugin_info(NameVsn, #{}) of
{ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
{error, #{
error_msg => "bad_plugin_running_status",
msg => "bad_plugin_running_status",
hint => "stop_the_plugin_first"
}};
{ok, #{config_status := enabled}} ->
{error, #{
error_msg => "bad_plugin_config_status",
msg => "bad_plugin_config_status",
hint => "disable_the_plugin_first"
}};
_ ->
@ -279,9 +288,9 @@ ensure_started(NameVsn) ->
case do_ensure_started(NameVsn) of
ok ->
ok;
{error, Reason} ->
?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}),
{error, Reason}
{error, ReasonMap} ->
?SLOG(error, ReasonMap#{msg => "failed_to_start_plugin"}),
{error, ReasonMap}
end.
%% @doc Stop all plugins before broker stops.
@ -374,7 +383,7 @@ list() ->
{ok, Info} ->
{true, Info};
{error, Reason} ->
?SLOG(warning, Reason),
?SLOG(warning, Reason#{msg => "failed_to_read_plugin_info"}),
false
end
end,
@ -402,7 +411,10 @@ decode_plugin_config_map(NameVsn, AvroJsonMap) ->
do_decode_plugin_config_map(NameVsn, AvroJsonMap)
end;
false ->
?SLOG(debug, #{name_vsn => NameVsn, plugin_with_avro_schema => false}),
?SLOG(debug, #{
msg => "plugin_without_config_schema",
name_vsn => NameVsn
}),
{ok, ?plugin_without_config_schema}
end.
@ -537,13 +549,13 @@ do_ensure_installed(NameVsn) ->
end;
{error, {_, enoent}} ->
{error, #{
error_msg => "failed_to_extract_plugin_package",
msg => "failed_to_extract_plugin_package",
path => TarGz,
reason => not_found
reason => plugin_tarball_not_found
}};
{error, Reason} ->
{error, #{
error_msg => "bad_plugin_package",
msg => "bad_plugin_package",
path => TarGz,
reason => Reason
}}
@ -600,7 +612,7 @@ add_new_configured(Configured, {Action, NameVsn}, Item) ->
{Front, Rear} = lists:splitwith(SplitFun, Configured),
Rear =:= [] andalso
throw(#{
error_msg => "position_anchor_plugin_not_configured",
msg => "position_anchor_plugin_not_configured",
hint => "maybe_install_and_configure",
name_vsn => NameVsn
}),
@ -664,12 +676,13 @@ do_ensure_started(NameVsn) ->
ok ->
Plugin = do_read_plugin(NameVsn),
ok = load_code_start_apps(NameVsn, Plugin);
{error, plugin_not_found} ->
{error, #{reason := Reason} = ReasonMap} ->
?SLOG(error, #{
error_msg => "plugin_not_found",
name_vsn => NameVsn
msg => "failed_to_start_plugin",
name_vsn => NameVsn,
reason => Reason
}),
ok
{error, ReasonMap}
end
end
).
@ -682,10 +695,12 @@ tryit(WhichOp, F) ->
try
F()
catch
throw:ReasonMap ->
throw:ReasonMap when is_map(ReasonMap) ->
%% thrown exceptions are known errors
%% translate to a return value without stacktrace
{error, ReasonMap};
throw:Reason ->
{error, #{reason => Reason}};
error:Reason:Stacktrace ->
%% unexpected errors, log stacktrace
?SLOG(warning, #{
@ -769,18 +784,18 @@ do_get_from_cluster(NameVsn) ->
ok = do_ensure_installed(NameVsn);
{error, NodeErrors} when Nodes =/= [] ->
ErrMeta = #{
error_msg => "failed_to_copy_plugin_from_other_nodes",
msg => "failed_to_copy_plugin_from_other_nodes",
name_vsn => NameVsn,
node_errors => NodeErrors,
reason => not_found
reason => plugin_not_found
},
?SLOG(error, ErrMeta),
{error, ErrMeta};
{error, _} ->
ErrMeta = #{
error_msg => "no_nodes_to_copy_plugin_from",
msg => "no_nodes_to_copy_plugin_from",
name_vsn => NameVsn,
reason => not_found
reason => plugin_not_found
},
?SLOG(error, ErrMeta),
{error, ErrMeta}
@ -791,6 +806,11 @@ get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
{ok, _} = Res ->
?SLOG(debug, #{
msg => "get_plugin_tar_from_cluster_successfully",
node => Node,
name_vsn => NameVsn
}),
Res;
Err ->
get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
@ -805,6 +825,11 @@ get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
)
of
{ok, _} = Res ->
?SLOG(debug, #{
msg => "get_plugin_config_from_cluster_successfully",
node => Node,
name_vsn => NameVsn
}),
Res;
Err ->
get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
@ -870,7 +895,7 @@ check_plugin(
catch
_:_ ->
throw(#{
error_msg => "bad_rel_apps",
msg => "bad_rel_apps",
rel_apps => Apps,
hint => "A non-empty string list of app_name-app_vsn format"
})
@ -878,7 +903,7 @@ check_plugin(
Info;
false ->
throw(#{
error_msg => "name_vsn_mismatch",
msg => "name_vsn_mismatch",
name_vsn => NameVsn,
path => FilePath,
name => Name,
@ -887,7 +912,7 @@ check_plugin(
end;
check_plugin(_What, NameVsn, File) ->
throw(#{
error_msg => "bad_info_file_content",
msg => "bad_info_file_content",
mandatory_fields => [rel_vsn, name, rel_apps, description],
name_vsn => NameVsn,
path => File
@ -943,7 +968,7 @@ do_load_plugin_app(AppName, Ebin) ->
ok;
{error, Reason} ->
throw(#{
error_msg => "failed_to_load_plugin_beam",
msg => "failed_to_load_plugin_beam",
path => BeamFile,
reason => Reason
})
@ -958,7 +983,7 @@ do_load_plugin_app(AppName, Ebin) ->
ok;
{error, Reason} ->
throw(#{
error_msg => "failed_to_load_plugin_app",
msg => "failed_to_load_plugin_app",
name => AppName,
reason => Reason
})
@ -975,7 +1000,7 @@ start_app(App) ->
ok;
{error, {ErrApp, Reason}} ->
throw(#{
error_msg => "failed_to_start_plugin_app",
msg => "failed_to_start_plugin_app",
app => App,
err_app => ErrApp,
reason => Reason
@ -1057,7 +1082,7 @@ stop_app(App) ->
?SLOG(debug, #{msg => "plugin_not_started", app => App}),
ok = unload_moudle_and_app(App);
{error, Reason} ->
throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason})
throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
end.
unload_moudle_and_app(App) ->
@ -1152,13 +1177,13 @@ for_plugins(ActionFun) ->
for_plugins_action_error_occurred,
ErrMeta
),
?SLOG(error, ErrMeta),
?SLOG(error, ErrMeta#{msg => "for_plugins_action_error_occurred"}),
ok
end.
maybe_post_op_after_installed(NameVsn0) ->
maybe_post_op_after_installed(NameVsn0, Mode) ->
NameVsn = wrap_to_list(NameVsn0),
_ = maybe_load_config_schema(NameVsn),
_ = maybe_load_config_schema(NameVsn, Mode),
ok = maybe_ensure_state(NameVsn).
maybe_ensure_state(NameVsn) ->
@ -1183,13 +1208,13 @@ maybe_ensure_state(NameVsn) ->
end,
ok.
maybe_load_config_schema(NameVsn) ->
maybe_load_config_schema(NameVsn, Mode) ->
AvscPath = avsc_file_path(NameVsn),
_ =
with_plugin_avsc(NameVsn) andalso
filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath),
_ = maybe_create_config_dir(NameVsn).
_ = maybe_create_config_dir(NameVsn, Mode).
do_load_config_schema(NameVsn, AvscPath) ->
case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
@ -1198,11 +1223,11 @@ do_load_config_schema(NameVsn, AvscPath) ->
{error, _Reason} -> ok
end.
maybe_create_config_dir(NameVsn) ->
maybe_create_config_dir(NameVsn, Mode) ->
with_plugin_avsc(NameVsn) andalso
do_create_config_dir(NameVsn).
do_create_config_dir(NameVsn, Mode).
do_create_config_dir(NameVsn) ->
do_create_config_dir(NameVsn, Mode) ->
case plugin_config_dir(NameVsn) of
{error, Reason} ->
{error, {gen_config_dir_failed, Reason}};
@ -1210,7 +1235,7 @@ do_create_config_dir(NameVsn) ->
case filelib:ensure_path(ConfigDir) of
ok ->
%% get config from other nodes or get from tarball
_ = maybe_ensure_plugin_config(NameVsn),
_ = maybe_ensure_plugin_config(NameVsn, Mode),
ok;
{error, Reason} ->
?SLOG(warning, #{
@ -1222,20 +1247,25 @@ do_create_config_dir(NameVsn) ->
end
end.
-spec maybe_ensure_plugin_config(name_vsn()) -> ok.
maybe_ensure_plugin_config(NameVsn) ->
-spec maybe_ensure_plugin_config(name_vsn(), ?fresh_install | ?normal) -> ok.
maybe_ensure_plugin_config(NameVsn, Mode) ->
maybe
true ?= with_plugin_avsc(NameVsn),
_ = ensure_plugin_config(NameVsn)
_ = ensure_plugin_config({NameVsn, Mode})
else
_ -> ok
end.
-spec ensure_plugin_config(name_vsn()) -> ok.
ensure_plugin_config(NameVsn) ->
%% fetch plugin hocon config from cluster
Nodes = [N || N <- mria:running_nodes(), N /= node()],
ensure_plugin_config(NameVsn, Nodes).
-spec ensure_plugin_config({name_vsn(), ?fresh_install | ?normal}) -> ok.
ensure_plugin_config({NameVsn, ?normal}) ->
ensure_plugin_config(NameVsn, [N || N <- mria:running_nodes(), N /= node()]);
ensure_plugin_config({NameVsn, ?fresh_install}) ->
?SLOG(debug, #{
msg => "default_plugin_config_used",
name_vsn => NameVsn,
hint => "fresh_install"
}),
cp_default_config_file(NameVsn).
-spec ensure_plugin_config(name_vsn(), list()) -> ok.
ensure_plugin_config(NameVsn, []) ->
@ -1255,8 +1285,6 @@ ensure_plugin_config(NameVsn, Nodes) ->
ensure_config_map(NameVsn);
_ ->
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
%% otherwise cp default hocon file
%% i.e. Clean installation
cp_default_config_file(NameVsn)
end.
@ -1292,6 +1320,11 @@ ensure_config_map(NameVsn) ->
true ->
do_ensure_config_map(NameVsn, ConfigJsonMap);
false ->
?SLOG(debug, #{
msg => "put_plugin_config_directly",
hint => "plugin_without_config_schema",
name_vsn => NameVsn
}),
put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
end;
_ ->
@ -1376,23 +1409,23 @@ prune_backup_files(Path) ->
Deletes
).
read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) ->
read_file_fun(Path, Msg, #{read_mode := ?RAW_BIN}) ->
fun() ->
case file:read_file(Path) of
{ok, Bin} ->
{ok, Bin};
{error, Reason} ->
ErrMeta = #{error_msg => ErrMsg, reason => Reason},
ErrMeta = #{msg => Msg, reason => Reason},
throw(ErrMeta)
end
end;
read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
read_file_fun(Path, Msg, #{read_mode := ?JSON_MAP}) ->
fun() ->
case hocon:load(Path, #{format => richmap}) of
{ok, RichMap} ->
{ok, hocon_maps:ensure_plain(RichMap)};
{error, Reason} ->
ErrMeta = #{error_msg => ErrMsg, reason => Reason},
ErrMeta = #{msg => Msg, reason => Reason},
throw(ErrMeta)
end
end.

View File

@ -353,7 +353,7 @@ t_enable_disable(Config) ->
?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
?assertMatch(
{error, #{
error_msg := "bad_plugin_config_status",
msg := "bad_plugin_config_status",
hint := "disable_the_plugin_first"
}},
emqx_plugins:ensure_uninstalled(NameVsn)
@ -381,7 +381,7 @@ t_bad_tar_gz(Config) ->
ok = file:write_file(FakeTarTz, "a\n"),
?assertMatch(
{error, #{
error_msg := "bad_plugin_package",
msg := "bad_plugin_package",
reason := eof
}},
emqx_plugins:ensure_installed("fake-vsn")
@ -389,8 +389,8 @@ t_bad_tar_gz(Config) ->
%% the plugin tarball can not be found on any nodes
?assertMatch(
{error, #{
error_msg := "no_nodes_to_copy_plugin_from",
reason := not_found
msg := "no_nodes_to_copy_plugin_from",
reason := plugin_not_found
}},
emqx_plugins:ensure_installed("nonexisting")
),
@ -463,7 +463,7 @@ t_bad_info_json(Config) ->
ok = write_info_file(Config, NameVsn, "bad-syntax"),
?assertMatch(
{error, #{
error_msg := "bad_info_file",
msg := "bad_info_file",
reason := {parse_error, _}
}},
emqx_plugins:describe(NameVsn)
@ -471,7 +471,7 @@ t_bad_info_json(Config) ->
ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"),
?assertMatch(
{error, #{
error_msg := "bad_info_file_content",
msg := "bad_info_file_content",
mandatory_fields := _
}},
emqx_plugins:describe(NameVsn)

View File

@ -57,7 +57,7 @@ read_plugin_test() ->
try
ok = write_file(InfoFile, FakeInfo),
?assertMatch(
{error, #{error_msg := "bad_rel_apps"}},
{error, #{msg := "bad_rel_apps"}},
emqx_plugins:read_plugin_info(NameVsn, #{})
)
after