fix: ensure avro file

This commit is contained in:
JimMoen 2024-05-17 15:48:21 +08:00
parent df7dcb2764
commit a7f2f95318
No known key found for this signature in database
3 changed files with 134 additions and 44 deletions

View File

@ -67,7 +67,8 @@
-export([ -export([
decode_plugin_avro_config/2, decode_plugin_avro_config/2,
install_dir/0, install_dir/0,
avsc_file_path/1 avsc_file_path/1,
with_plugin_avsc/1
]). ]).
%% `emqx_config_handler' API %% `emqx_config_handler' API
@ -332,6 +333,15 @@ decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
{error, ReasonMap} -> {error, ReasonMap} {error, ReasonMap} -> {error, ReasonMap}
end. end.
-spec with_plugin_avsc(name_vsn()) -> boolean().
with_plugin_avsc(NameVsn) ->
case read_plugin_info(NameVsn, #{fill_readme => false}) of
{ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
WithAvsc;
_ ->
false
end.
get_config_interal(Key, Default) when is_atom(Key) -> get_config_interal(Key, Default) when is_atom(Key) ->
get_config_interal([Key], Default); get_config_interal([Key], Default);
get_config_interal(Path, Default) -> get_config_interal(Path, Default) ->
@ -438,7 +448,6 @@ do_ensure_installed(NameVsn) ->
ok = write_tar_file_content(install_dir(), TarContent), ok = write_tar_file_content(install_dir(), TarContent),
case read_plugin_info(NameVsn, #{}) of case read_plugin_info(NameVsn, #{}) of
{ok, _} -> {ok, _} ->
ok = maybe_post_op_after_install(NameVsn),
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
@ -572,6 +581,8 @@ do_ensure_started(NameVsn) ->
case ensure_exists_and_installed(NameVsn) of case ensure_exists_and_installed(NameVsn) of
ok -> ok ->
Plugin = do_read_plugin(NameVsn), Plugin = do_read_plugin(NameVsn),
%% ok = ensure_avro_config(NameVsn);
ok = maybe_post_op_after_install(NameVsn),
ok = load_code_start_apps(NameVsn, Plugin); ok = load_code_start_apps(NameVsn, Plugin);
{error, plugin_not_found} -> {error, plugin_not_found} ->
?SLOG(error, #{ ?SLOG(error, #{
@ -611,16 +622,16 @@ tryit(WhichOp, F) ->
read_plugin_info(NameVsn, Options) -> read_plugin_info(NameVsn, Options) ->
tryit( tryit(
atom_to_list(?FUNCTION_NAME), atom_to_list(?FUNCTION_NAME),
fun() -> {ok, do_read_plugin2(NameVsn, Options)} end fun() -> {ok, do_read_plugin(NameVsn, Options)} end
). ).
do_read_plugin(NameVsn) -> do_read_plugin(NameVsn) ->
do_read_plugin2(NameVsn, #{}). do_read_plugin(NameVsn, #{}).
do_read_plugin2(NameVsn, Option) -> do_read_plugin(NameVsn, Option) ->
do_read_plugin3(NameVsn, info_file_path(NameVsn), Option). do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
do_read_plugin3(NameVsn, InfoFilePath, Options) -> do_read_plugin(NameVsn, InfoFilePath, Options) ->
{ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(), {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
Info1 = plugins_readme(NameVsn, Options, Info0), Info1 = plugins_readme(NameVsn, Options, Info0),
@ -659,8 +670,7 @@ ensure_exists_and_installed(NameVsn) ->
case get_tar(NameVsn) of case get_tar(NameVsn) of
{ok, TarContent} -> {ok, TarContent} ->
ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = file:write_file(pkg_file_path(NameVsn), TarContent),
ok = do_ensure_installed(NameVsn), ok = do_ensure_installed(NameVsn);
ok = ensure_avro_config(NameVsn);
_ -> _ ->
%% If not, try to get it from the cluster. %% If not, try to get it from the cluster.
do_get_from_cluster(NameVsn) do_get_from_cluster(NameVsn)
@ -698,6 +708,16 @@ get_from_any_node([Node | T], NameVsn, Errors) ->
get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
end. end.
get_config_from_any_node([], _NameVsn, Errors) ->
{error, Errors};
get_config_from_any_node([Node | T], NameVsn, Errors) ->
case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of
{ok, _} = Res ->
Res;
Err ->
get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
end.
plugins_readme(NameVsn, #{fill_readme := true}, Info) -> plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
case file:read_file(readme_file(NameVsn)) of case file:read_file(readme_file(NameVsn)) of
{ok, Bin} -> Info#{readme => Bin}; {ok, Bin} -> Info#{readme => Bin};
@ -1031,13 +1051,15 @@ for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
maybe_post_op_after_install(NameVsn) -> maybe_post_op_after_install(NameVsn) ->
_ = maybe_load_config_schema(NameVsn), _ = maybe_load_config_schema(NameVsn),
_ = maybe_create_config_dir(NameVsn),
ok. ok.
maybe_load_config_schema(NameVsn) -> maybe_load_config_schema(NameVsn) ->
AvscPath = avsc_file_path(NameVsn), AvscPath = avsc_file_path(NameVsn),
filelib:is_regular(AvscPath) andalso _ =
do_load_config_schema(NameVsn, AvscPath). with_plugin_avsc(NameVsn) andalso
filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath),
_ = maybe_create_config_dir(NameVsn).
do_load_config_schema(NameVsn, AvscPath) -> do_load_config_schema(NameVsn, AvscPath) ->
case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
@ -1047,39 +1069,54 @@ do_load_config_schema(NameVsn, AvscPath) ->
end. end.
maybe_create_config_dir(NameVsn) -> maybe_create_config_dir(NameVsn) ->
ConfigDir = plugin_config_dir(NameVsn), with_plugin_avsc(NameVsn) andalso
case filelib:ensure_path(ConfigDir) of do_create_config_dir(NameVsn).
ok ->
_ = maybe_copy_default_avro_config(NameVsn), do_create_config_dir(NameVsn) ->
ok; case plugin_config_dir(NameVsn) of
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ {error, {gen_config_dir_failed, Reason}};
msg => "failed_to_create_plugin_config_dir", ConfigDir ->
dir => ConfigDir, case filelib:ensure_path(ConfigDir) of
reason => Reason ok ->
}), %% get config from other nodes or get from tarball
{error, {mkdir_failed, ConfigDir, Reason}} _ = maybe_ensure_plugin_config(NameVsn),
ok;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_create_plugin_config_dir",
dir => ConfigDir,
reason => Reason
}),
{error, {mkdir_failed, ConfigDir, Reason}}
end
end. end.
maybe_copy_default_avro_config(NameVsn) -> maybe_ensure_plugin_config(NameVsn) ->
Source = default_avro_config_file(NameVsn), Nodes = [N || N <- mria:running_nodes(), N /= node()],
Destination = avro_config_file(NameVsn), case get_config_from_any_node(Nodes, NameVsn, []) of
filelib:is_regular(Source) andalso {ok, AvroBin} ->
case file:copy(Source, Destination) of ok = file:write_file(avro_config_file(NameVsn), AvroBin),
{ok, _} -> ensure_avro_config(NameVsn);
ok, _ ->
ensure_avro_config(NameVsn); %% always copy default avro file into config dir
{error, Reason} -> %% when can not get config from other nodes
?SLOG(warning, #{ Source = default_avro_config_file(NameVsn),
msg => "failed_to_copy_plugin_default_avro_config", Destination = avro_config_file(NameVsn),
source => Source, filelib:is_regular(Source) andalso
destination => Destination, case file:copy(Source, Destination) of
reason => Reason {ok, _} ->
}) ok,
end. ensure_avro_config(NameVsn);
{error, Reason} ->
%% ensure_avro_config() -> ?SLOG(warning, #{
%% ok = for_plugins(fun(NameVsn) -> ensure_avro_config(NameVsn) end). msg => "failed_to_copy_plugin_default_avro_config",
source => Source,
destination => Destination,
reason => Reason
})
end
end.
ensure_avro_config(NameVsn) -> ensure_avro_config(NameVsn) ->
case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of
@ -1182,9 +1219,19 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
plugin_dir(NameVsn) -> plugin_dir(NameVsn) ->
wrap_list_path(filename:join([install_dir(), NameVsn])). wrap_list_path(filename:join([install_dir(), NameVsn])).
-spec plugin_config_dir(name_vsn()) -> string(). -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
plugin_config_dir(NameVsn) -> plugin_config_dir(NameVsn) ->
wrap_list_path(filename:join([emqx:data_dir(), "plugins", NameVsn])). case parse_name_vsn(NameVsn) of
{ok, NameAtom, _Vsn} ->
wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_generate_plugin_config_dir_for_plugin",
plugin_namevsn => NameVsn,
reason => Reason
}),
{error, Reason}
end.
%% Files %% Files
-spec pkg_file_path(name_vsn()) -> string(). -spec pkg_file_path(name_vsn()) -> string().

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
get_tar/3 get_tar/3
]). ]).
@ -30,6 +31,9 @@
introduced_in() -> introduced_in() ->
"5.0.21". "5.0.21".
deprecated_since() ->
"5.7.0".
-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. -spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}.
get_tar(Node, NameVsn, Timeout) -> get_tar(Node, NameVsn, Timeout) ->
rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).

View File

@ -0,0 +1,39 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugins_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_tar/3,
get_config/3
]).
-include_lib("emqx/include/bpapi.hrl").
-type name_vsn() :: binary() | string().
introduced_in() ->
"5.7.0".
-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}.
get_tar(Node, NameVsn, Timeout) ->
rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).
get_config(Node, NameVsn, Timeout) ->
rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout).