diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 39354f756..1f98b2bd4 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -67,7 +67,8 @@ -export([ decode_plugin_avro_config/2, install_dir/0, - avsc_file_path/1 + avsc_file_path/1, + with_plugin_avsc/1 ]). %% `emqx_config_handler' API @@ -332,6 +333,15 @@ decode_plugin_avro_config(NameVsn, AvroJsonBin) -> {error, ReasonMap} -> {error, ReasonMap} end. +-spec with_plugin_avsc(name_vsn()) -> boolean(). +with_plugin_avsc(NameVsn) -> + case read_plugin_info(NameVsn, #{fill_readme => false}) of + {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) -> + WithAvsc; + _ -> + false + end. + get_config_interal(Key, Default) when is_atom(Key) -> get_config_interal([Key], Default); get_config_interal(Path, Default) -> @@ -438,7 +448,6 @@ do_ensure_installed(NameVsn) -> ok = write_tar_file_content(install_dir(), TarContent), case read_plugin_info(NameVsn, #{}) of {ok, _} -> - ok = maybe_post_op_after_install(NameVsn), ok; {error, Reason} -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), @@ -572,6 +581,8 @@ do_ensure_started(NameVsn) -> case ensure_exists_and_installed(NameVsn) of ok -> Plugin = do_read_plugin(NameVsn), + %% ok = ensure_avro_config(NameVsn); + ok = maybe_post_op_after_install(NameVsn), ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ @@ -611,16 +622,16 @@ tryit(WhichOp, F) -> read_plugin_info(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - fun() -> {ok, do_read_plugin2(NameVsn, Options)} end + fun() -> {ok, do_read_plugin(NameVsn, Options)} end ). do_read_plugin(NameVsn) -> - do_read_plugin2(NameVsn, #{}). + do_read_plugin(NameVsn, #{}). -do_read_plugin2(NameVsn, Option) -> - do_read_plugin3(NameVsn, info_file_path(NameVsn), Option). +do_read_plugin(NameVsn, Option) -> + do_read_plugin(NameVsn, info_file_path(NameVsn), Option). -do_read_plugin3(NameVsn, InfoFilePath, Options) -> +do_read_plugin(NameVsn, InfoFilePath, Options) -> {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(), Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), Info1 = plugins_readme(NameVsn, Options, Info0), @@ -659,8 +670,7 @@ ensure_exists_and_installed(NameVsn) -> case get_tar(NameVsn) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), - ok = do_ensure_installed(NameVsn), - ok = ensure_avro_config(NameVsn); + ok = do_ensure_installed(NameVsn); _ -> %% If not, try to get it from the cluster. do_get_from_cluster(NameVsn) @@ -698,6 +708,16 @@ get_from_any_node([Node | T], NameVsn, Errors) -> get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. +get_config_from_any_node([], _NameVsn, Errors) -> + {error, Errors}; +get_config_from_any_node([Node | T], NameVsn, Errors) -> + case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of + {ok, _} = Res -> + Res; + Err -> + get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + end. + plugins_readme(NameVsn, #{fill_readme := true}, Info) -> case file:read_file(readme_file(NameVsn)) of {ok, Bin} -> Info#{readme => Bin}; @@ -1031,13 +1051,15 @@ for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> maybe_post_op_after_install(NameVsn) -> _ = maybe_load_config_schema(NameVsn), - _ = maybe_create_config_dir(NameVsn), ok. maybe_load_config_schema(NameVsn) -> AvscPath = avsc_file_path(NameVsn), - filelib:is_regular(AvscPath) andalso - do_load_config_schema(NameVsn, AvscPath). + _ = + with_plugin_avsc(NameVsn) andalso + filelib:is_regular(AvscPath) andalso + do_load_config_schema(NameVsn, AvscPath), + _ = maybe_create_config_dir(NameVsn). do_load_config_schema(NameVsn, AvscPath) -> case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of @@ -1047,39 +1069,54 @@ do_load_config_schema(NameVsn, AvscPath) -> end. maybe_create_config_dir(NameVsn) -> - ConfigDir = plugin_config_dir(NameVsn), - case filelib:ensure_path(ConfigDir) of - ok -> - _ = maybe_copy_default_avro_config(NameVsn), - ok; + with_plugin_avsc(NameVsn) andalso + do_create_config_dir(NameVsn). + +do_create_config_dir(NameVsn) -> + case plugin_config_dir(NameVsn) of {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_create_plugin_config_dir", - dir => ConfigDir, - reason => Reason - }), - {error, {mkdir_failed, ConfigDir, Reason}} + {error, {gen_config_dir_failed, Reason}}; + ConfigDir -> + case filelib:ensure_path(ConfigDir) of + ok -> + %% get config from other nodes or get from tarball + _ = maybe_ensure_plugin_config(NameVsn), + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_create_plugin_config_dir", + dir => ConfigDir, + reason => Reason + }), + {error, {mkdir_failed, ConfigDir, Reason}} + end end. -maybe_copy_default_avro_config(NameVsn) -> - Source = default_avro_config_file(NameVsn), - Destination = avro_config_file(NameVsn), - filelib:is_regular(Source) andalso - case file:copy(Source, Destination) of - {ok, _} -> - ok, - ensure_avro_config(NameVsn); - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", - source => Source, - destination => Destination, - reason => Reason - }) - end. - -%% ensure_avro_config() -> -%% ok = for_plugins(fun(NameVsn) -> ensure_avro_config(NameVsn) end). +maybe_ensure_plugin_config(NameVsn) -> + Nodes = [N || N <- mria:running_nodes(), N /= node()], + case get_config_from_any_node(Nodes, NameVsn, []) of + {ok, AvroBin} -> + ok = file:write_file(avro_config_file(NameVsn), AvroBin), + ensure_avro_config(NameVsn); + _ -> + %% always copy default avro file into config dir + %% when can not get config from other nodes + Source = default_avro_config_file(NameVsn), + Destination = avro_config_file(NameVsn), + filelib:is_regular(Source) andalso + case file:copy(Source, Destination) of + {ok, _} -> + ok, + ensure_avro_config(NameVsn); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_avro_config", + source => Source, + destination => Destination, + reason => Reason + }) + end + end. ensure_avro_config(NameVsn) -> case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of @@ -1182,9 +1219,19 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> plugin_dir(NameVsn) -> wrap_list_path(filename:join([install_dir(), NameVsn])). --spec plugin_config_dir(name_vsn()) -> string(). +-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. plugin_config_dir(NameVsn) -> - wrap_list_path(filename:join([emqx:data_dir(), "plugins", NameVsn])). + case parse_name_vsn(NameVsn) of + {ok, NameAtom, _Vsn} -> + wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)])); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_generate_plugin_config_dir_for_plugin", + plugin_namevsn => NameVsn, + reason => Reason + }), + {error, Reason} + end. %% Files -spec pkg_file_path(name_vsn()) -> string(). diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl index bc1e31473..04e06337b 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, get_tar/3 ]). @@ -30,6 +31,9 @@ introduced_in() -> "5.0.21". +deprecated_since() -> + "5.7.0". + -spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl new file mode 100644 index 000000000..480082e28 --- /dev/null +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_tar/3, + get_config/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-type name_vsn() :: binary() | string(). + +introduced_in() -> + "5.7.0". + +-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. +get_tar(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). + +get_config(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout).