From 4bc80ee48386ed7300ad35a6a03221301fd234a0 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 15 May 2024 18:21:00 +0800 Subject: [PATCH 01/15] build: unsupported otp23 and otp24 casued `maybe_expr` introduced --- scripts/ensure-rebar3.sh | 11 +---------- scripts/get-otp-vsn.sh | 2 +- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/scripts/ensure-rebar3.sh b/scripts/ensure-rebar3.sh index 054deabd4..14b6ad7c7 100755 --- a/scripts/ensure-rebar3.sh +++ b/scripts/ensure-rebar3.sh @@ -4,17 +4,8 @@ set -euo pipefail [ "${DEBUG:-0}" -eq 1 ] && set -x -## rebar3 tag 3.19.0-emqx-1 is compiled using latest official OTP-24 image. -## we have to use an otp24-compiled rebar3 because the defination of record #application{} -## in systools.hrl is changed in otp24. OTP_VSN="${OTP_VSN:-$(./scripts/get-otp-vsn.sh)}" case ${OTP_VSN} in - 23*) - VERSION="3.16.1-emqx-1" - ;; - 24*) - VERSION="3.18.0-emqx-1" - ;; 25*) VERSION="3.19.0-emqx-9" ;; @@ -22,7 +13,7 @@ case ${OTP_VSN} in VERSION="3.20.0-emqx-1" ;; *) - echo "Unsupporetd Erlang/OTP version $OTP_VSN" + echo "Unsupported Erlang/OTP version $OTP_VSN" exit 1 ;; esac diff --git a/scripts/get-otp-vsn.sh b/scripts/get-otp-vsn.sh index 699a16f3f..d21e2c830 100755 --- a/scripts/get-otp-vsn.sh +++ b/scripts/get-otp-vsn.sh @@ -2,4 +2,4 @@ set -euo pipefail -erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().' +erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().' From e8d4e8811825ac03ca930bc73295aa576b31117e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 16 May 2024 17:31:02 +0800 Subject: [PATCH 02/15] fix(plugin): serde not found on new-joined nodes --- apps/emqx_plugins/src/emqx_plugins_app.erl | 2 +- apps/emqx_plugins/src/emqx_plugins_serde.erl | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins_app.erl b/apps/emqx_plugins/src/emqx_plugins_app.erl index a8b2fd6d6..740bcdd22 100644 --- a/apps/emqx_plugins/src/emqx_plugins_app.erl +++ b/apps/emqx_plugins/src/emqx_plugins_app.erl @@ -27,8 +27,8 @@ start(_Type, _Args) -> %% load all pre-configured - ok = emqx_plugins:ensure_started(), {ok, Sup} = emqx_plugins_sup:start_link(), + ok = emqx_plugins:ensure_started(), ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins), {ok, Sup}. diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index b50258b9c..fdc46e661 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -33,7 +33,6 @@ init/1, handle_call/3, handle_cast/2, - handle_continue/2, terminate/2 ]). @@ -126,11 +125,10 @@ init(_) -> ]), State = #{}, AvscPaths = get_plugin_avscs(), - {ok, State, {continue, {build_serdes, AvscPaths}}}. - -handle_continue({build_serdes, AvscPaths}, State) -> + %% force build all schemas at startup + %% otherwise plugin schema may not be available when needed _ = build_serdes(AvscPaths), - {noreply, State}. + {ok, State}. handle_call({build_serdes, NameVsn, AvscPath}, _From, State) -> BuildRes = do_build_serde({NameVsn, AvscPath}), From e0e4517d9e1b1eae0bf3e527278a5ca3e31bf220 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 17 May 2024 09:10:10 +0800 Subject: [PATCH 03/15] fix: ensure plugin config on boot --- .../src/emqx_mgmt_api_plugins.erl | 4 +- apps/emqx_plugins/src/emqx_plugins.erl | 46 +++++++++++++++++-- apps/emqx_plugins/src/emqx_plugins_serde.erl | 4 +- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 94b16320a..bda053dee 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -599,9 +599,9 @@ ensure_action(Name, restart) -> ok. %% for RPC plugin avro encoded config update -do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) -> +do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) -> %% TODO: maybe use `PluginConfigMap` to validate config - emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap). + emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 108ef1386..dc8be43fa 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -659,7 +659,8 @@ ensure_exists_and_installed(NameVsn) -> case get_tar(NameVsn) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), - ok = do_ensure_installed(NameVsn); + ok = do_ensure_installed(NameVsn), + ok = ensure_avro_config(NameVsn); _ -> %% If not, try to get it from the cluster. do_get_from_cluster(NameVsn) @@ -1046,6 +1047,7 @@ maybe_create_config_dir(NameVsn) -> ConfigDir = plugin_config_dir(NameVsn), case filelib:ensure_path(ConfigDir) of ok -> + _ = maybe_copy_default_avro_config(NameVsn), ok; {error, Reason} -> ?SLOG(warning, #{ @@ -1056,6 +1058,37 @@ maybe_create_config_dir(NameVsn) -> {error, {mkdir_failed, ConfigDir, Reason}} end. +maybe_copy_default_avro_config(NameVsn) -> + Source = default_avro_config_file(NameVsn), + Destination = avro_config_file(NameVsn), + filelib:is_regular(Source) andalso + case file:copy(Source, Destination) of + {ok, _} -> + ok, + ensure_avro_config(NameVsn); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_avro_config", + source => Source, + destination => Destination, + reason => Reason + }) + end. + +%% ensure_avro_config() -> +%% ok = for_plugins(fun(NameVsn) -> ensure_avro_config(NameVsn) end). + +ensure_avro_config(NameVsn) -> + case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of + {ok, AvroJsonMap} -> + {ok, Config} = decode_plugin_avro_config( + NameVsn, emqx_utils_json:encode(AvroJsonMap) + ), + put_config(NameVsn, AvroJsonMap, Config); + _ -> + ok + end. + %% @private Backup the current config to a file with a timestamp suffix and %% then save the new config to the config file. backup_and_write_avro_bin(NameVsn, AvroBin) -> @@ -1148,7 +1181,7 @@ plugin_dir(NameVsn) -> -spec plugin_config_dir(name_vsn()) -> string(). plugin_config_dir(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])). + wrap_list_path(filename:join([emqx:data_dir(), "plugins", NameVsn])). %% Files -spec pkg_file_path(name_vsn()) -> string(). @@ -1161,15 +1194,20 @@ info_file_path(NameVsn) -> -spec avsc_file_path(name_vsn()) -> string(). avsc_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config_schema.avsc"])). -spec avro_config_file(name_vsn()) -> string(). avro_config_file(NameVsn) -> wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])). +%% should only used when plugin installing +-spec default_avro_config_file(name_vsn()) -> string(). +default_avro_config_file(NameVsn) -> + wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config.avro"])). + -spec i18n_file_path(name_vsn()) -> string(). i18n_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config_i18n.json"])). -spec readme_file(name_vsn()) -> string(). readme_file(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index fdc46e661..7328c33ff 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -151,10 +151,10 @@ terminate(_Reason, _State) -> -spec get_plugin_avscs() -> [{string(), string()}]. get_plugin_avscs() -> - Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), + Pattern = filename:join([emqx_plugins:install_dir(), "*", "config", "config_schema.avsc"]), lists:foldl( fun(AvscPath, AccIn) -> - [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + [_, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)), [{to_bin(NameVsn), AvscPath} | AccIn] end, _Acc0 = [], From df7dcb2764227e9ade893804919d9ab09db87a64 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 17 May 2024 14:16:10 +0800 Subject: [PATCH 04/15] fix: do not let plugin start failed lead emqx start failed --- apps/emqx_plugins/src/emqx_plugins.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index dc8be43fa..39354f756 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -1013,8 +1013,11 @@ configured() -> for_plugins(ActionFun) -> case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of - [] -> ok; - Errors -> erlang:error(#{function => ActionFun, errors => Errors}) + [] -> + ok; + Errors -> + ?SLOG(error, #{function => ActionFun, errors => Errors}), + ok end. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> From a7f2f95318971f010ee288c5c334dbbc0a7dc402 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 17 May 2024 15:48:21 +0800 Subject: [PATCH 05/15] fix: ensure avro file --- apps/emqx_plugins/src/emqx_plugins.erl | 135 ++++++++++++------ .../src/proto/emqx_plugins_proto_v1.erl | 4 + .../src/proto/emqx_plugins_proto_v2.erl | 39 +++++ 3 files changed, 134 insertions(+), 44 deletions(-) create mode 100644 apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 39354f756..1f98b2bd4 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -67,7 +67,8 @@ -export([ decode_plugin_avro_config/2, install_dir/0, - avsc_file_path/1 + avsc_file_path/1, + with_plugin_avsc/1 ]). %% `emqx_config_handler' API @@ -332,6 +333,15 @@ decode_plugin_avro_config(NameVsn, AvroJsonBin) -> {error, ReasonMap} -> {error, ReasonMap} end. +-spec with_plugin_avsc(name_vsn()) -> boolean(). +with_plugin_avsc(NameVsn) -> + case read_plugin_info(NameVsn, #{fill_readme => false}) of + {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) -> + WithAvsc; + _ -> + false + end. + get_config_interal(Key, Default) when is_atom(Key) -> get_config_interal([Key], Default); get_config_interal(Path, Default) -> @@ -438,7 +448,6 @@ do_ensure_installed(NameVsn) -> ok = write_tar_file_content(install_dir(), TarContent), case read_plugin_info(NameVsn, #{}) of {ok, _} -> - ok = maybe_post_op_after_install(NameVsn), ok; {error, Reason} -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), @@ -572,6 +581,8 @@ do_ensure_started(NameVsn) -> case ensure_exists_and_installed(NameVsn) of ok -> Plugin = do_read_plugin(NameVsn), + %% ok = ensure_avro_config(NameVsn); + ok = maybe_post_op_after_install(NameVsn), ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ @@ -611,16 +622,16 @@ tryit(WhichOp, F) -> read_plugin_info(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - fun() -> {ok, do_read_plugin2(NameVsn, Options)} end + fun() -> {ok, do_read_plugin(NameVsn, Options)} end ). do_read_plugin(NameVsn) -> - do_read_plugin2(NameVsn, #{}). + do_read_plugin(NameVsn, #{}). -do_read_plugin2(NameVsn, Option) -> - do_read_plugin3(NameVsn, info_file_path(NameVsn), Option). +do_read_plugin(NameVsn, Option) -> + do_read_plugin(NameVsn, info_file_path(NameVsn), Option). -do_read_plugin3(NameVsn, InfoFilePath, Options) -> +do_read_plugin(NameVsn, InfoFilePath, Options) -> {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(), Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), Info1 = plugins_readme(NameVsn, Options, Info0), @@ -659,8 +670,7 @@ ensure_exists_and_installed(NameVsn) -> case get_tar(NameVsn) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), - ok = do_ensure_installed(NameVsn), - ok = ensure_avro_config(NameVsn); + ok = do_ensure_installed(NameVsn); _ -> %% If not, try to get it from the cluster. do_get_from_cluster(NameVsn) @@ -698,6 +708,16 @@ get_from_any_node([Node | T], NameVsn, Errors) -> get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. +get_config_from_any_node([], _NameVsn, Errors) -> + {error, Errors}; +get_config_from_any_node([Node | T], NameVsn, Errors) -> + case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of + {ok, _} = Res -> + Res; + Err -> + get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + end. + plugins_readme(NameVsn, #{fill_readme := true}, Info) -> case file:read_file(readme_file(NameVsn)) of {ok, Bin} -> Info#{readme => Bin}; @@ -1031,13 +1051,15 @@ for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> maybe_post_op_after_install(NameVsn) -> _ = maybe_load_config_schema(NameVsn), - _ = maybe_create_config_dir(NameVsn), ok. maybe_load_config_schema(NameVsn) -> AvscPath = avsc_file_path(NameVsn), - filelib:is_regular(AvscPath) andalso - do_load_config_schema(NameVsn, AvscPath). + _ = + with_plugin_avsc(NameVsn) andalso + filelib:is_regular(AvscPath) andalso + do_load_config_schema(NameVsn, AvscPath), + _ = maybe_create_config_dir(NameVsn). do_load_config_schema(NameVsn, AvscPath) -> case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of @@ -1047,39 +1069,54 @@ do_load_config_schema(NameVsn, AvscPath) -> end. maybe_create_config_dir(NameVsn) -> - ConfigDir = plugin_config_dir(NameVsn), - case filelib:ensure_path(ConfigDir) of - ok -> - _ = maybe_copy_default_avro_config(NameVsn), - ok; + with_plugin_avsc(NameVsn) andalso + do_create_config_dir(NameVsn). + +do_create_config_dir(NameVsn) -> + case plugin_config_dir(NameVsn) of {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_create_plugin_config_dir", - dir => ConfigDir, - reason => Reason - }), - {error, {mkdir_failed, ConfigDir, Reason}} + {error, {gen_config_dir_failed, Reason}}; + ConfigDir -> + case filelib:ensure_path(ConfigDir) of + ok -> + %% get config from other nodes or get from tarball + _ = maybe_ensure_plugin_config(NameVsn), + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_create_plugin_config_dir", + dir => ConfigDir, + reason => Reason + }), + {error, {mkdir_failed, ConfigDir, Reason}} + end end. -maybe_copy_default_avro_config(NameVsn) -> - Source = default_avro_config_file(NameVsn), - Destination = avro_config_file(NameVsn), - filelib:is_regular(Source) andalso - case file:copy(Source, Destination) of - {ok, _} -> - ok, - ensure_avro_config(NameVsn); - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", - source => Source, - destination => Destination, - reason => Reason - }) - end. - -%% ensure_avro_config() -> -%% ok = for_plugins(fun(NameVsn) -> ensure_avro_config(NameVsn) end). +maybe_ensure_plugin_config(NameVsn) -> + Nodes = [N || N <- mria:running_nodes(), N /= node()], + case get_config_from_any_node(Nodes, NameVsn, []) of + {ok, AvroBin} -> + ok = file:write_file(avro_config_file(NameVsn), AvroBin), + ensure_avro_config(NameVsn); + _ -> + %% always copy default avro file into config dir + %% when can not get config from other nodes + Source = default_avro_config_file(NameVsn), + Destination = avro_config_file(NameVsn), + filelib:is_regular(Source) andalso + case file:copy(Source, Destination) of + {ok, _} -> + ok, + ensure_avro_config(NameVsn); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_avro_config", + source => Source, + destination => Destination, + reason => Reason + }) + end + end. ensure_avro_config(NameVsn) -> case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of @@ -1182,9 +1219,19 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> plugin_dir(NameVsn) -> wrap_list_path(filename:join([install_dir(), NameVsn])). --spec plugin_config_dir(name_vsn()) -> string(). +-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. plugin_config_dir(NameVsn) -> - wrap_list_path(filename:join([emqx:data_dir(), "plugins", NameVsn])). + case parse_name_vsn(NameVsn) of + {ok, NameAtom, _Vsn} -> + wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)])); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_generate_plugin_config_dir_for_plugin", + plugin_namevsn => NameVsn, + reason => Reason + }), + {error, Reason} + end. %% Files -spec pkg_file_path(name_vsn()) -> string(). diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl index bc1e31473..04e06337b 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, get_tar/3 ]). @@ -30,6 +31,9 @@ introduced_in() -> "5.0.21". +deprecated_since() -> + "5.7.0". + -spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl new file mode 100644 index 000000000..480082e28 --- /dev/null +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_tar/3, + get_config/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-type name_vsn() :: binary() | string(). + +introduced_in() -> + "5.7.0". + +-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. +get_tar(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). + +get_config(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout). From 140b7ce51ed636b902d3e13d64d64bcc50c3c326 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 May 2024 11:06:02 +0800 Subject: [PATCH 06/15] fix(plugin): schema content only provided in enterprise edition --- apps/emqx_management/src/emqx_mgmt_api_plugins.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index bda053dee..5835b9120 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -695,10 +695,15 @@ aggregate_status([{Node, Plugins} | List], Acc) -> aggregate_status(List, NewAcc). format_plugin_avsc_and_i18n(NameVsn) -> - #{ - avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), - i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) - }. + case emqx_release:edition() of + ee -> + #{ + avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + }; + ce -> + #{avsc => null, i18n => null} + end. try_read_file(Fun) -> case Fun() of From 2076681e7608c2e989289c132495fadeaca099dd Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 17 May 2024 17:11:19 +0800 Subject: [PATCH 07/15] fix: put plugin config with binary namevsn key --- apps/emqx_plugins/include/emqx_plugins.hrl | 2 + apps/emqx_plugins/src/emqx_plugins.appup.src | 8 -- apps/emqx_plugins/src/emqx_plugins.erl | 82 ++++++++++++------- .../src/proto/emqx_plugins_proto_v2.erl | 6 +- 4 files changed, 59 insertions(+), 39 deletions(-) delete mode 100644 apps/emqx_plugins/src/emqx_plugins.appup.src diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index f822b9c8d..d1411884f 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -24,6 +24,8 @@ -define(CONFIG_FORMAT_AVRO, config_format_avro). -define(CONFIG_FORMAT_MAP, config_format_map). +-define(plugin_conf_not_found, plugin_conf_not_found). + -type schema_name() :: binary(). -type avsc_path() :: string(). diff --git a/apps/emqx_plugins/src/emqx_plugins.appup.src b/apps/emqx_plugins/src/emqx_plugins.appup.src deleted file mode 100644 index f9474dd33..000000000 --- a/apps/emqx_plugins/src/emqx_plugins.appup.src +++ /dev/null @@ -1,8 +0,0 @@ -%% -*- mode: erlang -*- -{"0.1.0", - [ {<<".*">>, []} - ], - [ - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 1f98b2bd4..156edce0b 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -80,7 +80,10 @@ -export([get_tar/1]). %% Internal export --export([do_ensure_started/1]). +-export([ + ensure_avro_config/1, + do_ensure_started/1 +]). %% for test cases -export([put_config_internal/2]). @@ -150,7 +153,8 @@ ensure_installed(NameVsn) -> ok; {error, _} -> ok = purge(NameVsn), - do_ensure_installed(NameVsn) + do_ensure_installed(NameVsn), + ok = maybe_post_op_after_install(NameVsn) end. %% @doc Ensure files and directories for the given plugin are being deleted. @@ -288,9 +292,13 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> %% @doc Update plugin's config. %% RPC call from Management API or CLI. %% the avro Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> + put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig); put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), + %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), + %% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok. @@ -581,8 +589,6 @@ do_ensure_started(NameVsn) -> case ensure_exists_and_installed(NameVsn) of ok -> Plugin = do_read_plugin(NameVsn), - %% ok = ensure_avro_config(NameVsn); - ok = maybe_post_op_after_install(NameVsn), ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ @@ -614,7 +620,11 @@ tryit(WhichOp, F) -> exception => Reason, stacktrace => Stacktrace }), - {error, {failed, WhichOp}} + {error, #{ + which_op => WhichOp, + exception => Reason, + stacktrace => Stacktrace + }} end. %% read plugin info from the JSON file @@ -708,10 +718,14 @@ get_from_any_node([Node | T], NameVsn, Errors) -> get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. -get_config_from_any_node([], _NameVsn, Errors) -> +get_avro_config_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_config_from_any_node([Node | T], NameVsn, Errors) -> - case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of +get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> + case + emqx_plugins_proto_v2:get_config( + Node, NameVsn, #{format => ?CONFIG_FORMAT_MAP}, ?plugin_conf_not_found, 5_000 + ) + of {ok, _} = Res -> Res; Err -> @@ -1041,6 +1055,8 @@ for_plugins(ActionFun) -> end. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> + %% always ensure the plugin is installed + ok = ensure_avro_config(NameVsn), case Fun(NameVsn) of ok -> []; {error, Reason} -> [{NameVsn, Reason}] @@ -1094,31 +1110,41 @@ do_create_config_dir(NameVsn) -> maybe_ensure_plugin_config(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_config_from_any_node(Nodes, NameVsn, []) of - {ok, AvroBin} -> - ok = file:write_file(avro_config_file(NameVsn), AvroBin), + case get_avro_config_from_any_node(Nodes, NameVsn, []) of + {ok, AvroJsonMap} when is_map(AvroJsonMap) -> + AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), + ok = file:write_file(avro_config_file(NameVsn), AvroJsonBin), ensure_avro_config(NameVsn); + %% {ok, ?plugin_conf_not_found} -> _ -> - %% always copy default avro file into config dir - %% when can not get config from other nodes - Source = default_avro_config_file(NameVsn), - Destination = avro_config_file(NameVsn), - filelib:is_regular(Source) andalso - case file:copy(Source, Destination) of - {ok, _} -> - ok, - ensure_avro_config(NameVsn); - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", - source => Source, - destination => Destination, - reason => Reason - }) - end + cp_default_avro_file(NameVsn), + ensure_avro_config(NameVsn) end. +cp_default_avro_file(NameVsn) -> + %% always copy default avro file into config dir + %% when can not get config from other nodes + Source = default_avro_config_file(NameVsn), + Destination = avro_config_file(NameVsn), + filelib:is_regular(Source) andalso + case file:copy(Source, Destination) of + {ok, _} -> + ok, + ensure_avro_config(NameVsn); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_avro_config", + source => Source, + destination => Destination, + reason => Reason + }) + end. + ensure_avro_config(NameVsn) -> + with_plugin_avsc(NameVsn) andalso + do_ensure_avro_config(NameVsn). + +do_ensure_avro_config(NameVsn) -> case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of {ok, AvroJsonMap} -> {ok, Config} = decode_plugin_avro_config( diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 480082e28..01d0a5ce1 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -21,7 +21,7 @@ -export([ introduced_in/0, get_tar/3, - get_config/3 + get_config/5 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -35,5 +35,5 @@ introduced_in() -> get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). -get_config(Node, NameVsn, Timeout) -> - rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout). +get_config(Node, NameVsn, Opts, Default, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opts, Default], Timeout). From 677352e49875a64c06369a6afa8c304ac07e49e4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 May 2024 15:38:54 +0800 Subject: [PATCH 08/15] fix: ensure installed and plugin state on boot --- apps/emqx_plugins/src/emqx_plugins.erl | 44 +++++++++++++++------- apps/emqx_plugins/src/emqx_plugins_app.erl | 1 + 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 156edce0b..81dc90c2e 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -34,6 +34,7 @@ %% Package operations -export([ + ensure_installed/0, ensure_installed/1, ensure_uninstalled/1, ensure_enabled/1, @@ -145,6 +146,15 @@ make_name_vsn_string(Name, Vsn) -> %%-------------------------------------------------------------------- %% Package operations +%% @doc Start all configured plugins are started. +-spec ensure_installed() -> ok. +ensure_installed() -> + Fun = fun(#{name_vsn := NameVsn}) -> + ensure_installed(NameVsn), + [] + end, + ok = for_plugins(Fun). + %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, map()}. ensure_installed(NameVsn) -> @@ -235,7 +245,17 @@ delete_package(NameVsn) -> %% @doc Start all configured plugins are started. -spec ensure_started() -> ok. ensure_started() -> - ok = for_plugins(fun ?MODULE:do_ensure_started/1). + Fun = fun + (#{name_vsn := NameVsn, enable := true}) -> + case do_ensure_started(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; + (#{name_vsn := NameVsn, enable := false}) -> + ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}), + [] + end, + ok = for_plugins(Fun). %% @doc Start a plugin from Management API or CLI. %% the input is a - string. @@ -252,7 +272,11 @@ ensure_started(NameVsn) -> %% @doc Stop all plugins before broker stops. -spec ensure_stopped() -> ok. ensure_stopped() -> - for_plugins(fun ?MODULE:ensure_stopped/1). + Fun = fun(#{name_vsn := NameVsn, enable := true}) -> + ensure_stopped(NameVsn), + [] + end, + ok = for_plugins(Fun). %% @doc Stop a plugin from Management API or CLI. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}. @@ -297,6 +321,8 @@ put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), + %% TODO: callback in plugin's on_config_changed (config update by mgmt API) + %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), %% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), @@ -1046,7 +1072,7 @@ configured() -> get_config_interal(states, []). for_plugins(ActionFun) -> - case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of + case lists:flatmap(ActionFun, configured()) of [] -> ok; Errors -> @@ -1054,18 +1080,8 @@ for_plugins(ActionFun) -> ok end. -for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> - %% always ensure the plugin is installed - ok = ensure_avro_config(NameVsn), - case Fun(NameVsn) of - ok -> []; - {error, Reason} -> [{NameVsn, Reason}] - end; -for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> - ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}), - []. - maybe_post_op_after_install(NameVsn) -> + _ = ensure_state(NameVsn, _Position = no_move, _Enabled = false, _ConfLocation = global), _ = maybe_load_config_schema(NameVsn), ok. diff --git a/apps/emqx_plugins/src/emqx_plugins_app.erl b/apps/emqx_plugins/src/emqx_plugins_app.erl index 740bcdd22..10e483c22 100644 --- a/apps/emqx_plugins/src/emqx_plugins_app.erl +++ b/apps/emqx_plugins/src/emqx_plugins_app.erl @@ -28,6 +28,7 @@ start(_Type, _Args) -> %% load all pre-configured {ok, Sup} = emqx_plugins_sup:start_link(), + ok = emqx_plugins:ensure_installed(), ok = emqx_plugins:ensure_started(), ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins), {ok, Sup}. From 1abc8cf5021557dcc33ff091e0f54160a6594991 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 May 2024 16:04:29 +0800 Subject: [PATCH 09/15] fix: ensure plugin's config on boot and join cluster --- apps/emqx_plugins/src/emqx_plugins.erl | 38 ++++++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 81dc90c2e..ab295e082 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -16,6 +16,8 @@ -module(emqx_plugins). +-feature(maybe_expr, enable). + -include_lib("emqx/include/logger.hrl"). -include("emqx_plugins.hrl"). @@ -160,11 +162,16 @@ ensure_installed() -> ensure_installed(NameVsn) -> case read_plugin_info(NameVsn, #{}) of {ok, _} -> - ok; + ok, + _ = maybe_ensure_plugin_config(NameVsn); {error, _} -> ok = purge(NameVsn), - do_ensure_installed(NameVsn), - ok = maybe_post_op_after_install(NameVsn) + case do_ensure_installed(NameVsn) of + ok -> + maybe_post_op_after_installed(NameVsn); + _ -> + ok + end end. %% @doc Ensure files and directories for the given plugin are being deleted. @@ -1080,9 +1087,9 @@ for_plugins(ActionFun) -> ok end. -maybe_post_op_after_install(NameVsn) -> - _ = ensure_state(NameVsn, _Position = no_move, _Enabled = false, _ConfLocation = global), +maybe_post_op_after_installed(NameVsn) -> _ = maybe_load_config_schema(NameVsn), + _ = ensure_state(NameVsn, no_move, false, global), ok. maybe_load_config_schema(NameVsn) -> @@ -1125,6 +1132,14 @@ do_create_config_dir(NameVsn) -> end. maybe_ensure_plugin_config(NameVsn) -> + maybe + true ?= filelib:is_regular(avro_config_file(NameVsn)), + ensure_avro_config(NameVsn) + else + _ -> do_ensure_plugin_config(NameVsn) + end. + +do_ensure_plugin_config(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], case get_avro_config_from_any_node(Nodes, NameVsn, []) of {ok, AvroJsonMap} when is_map(AvroJsonMap) -> @@ -1142,11 +1157,13 @@ cp_default_avro_file(NameVsn) -> %% when can not get config from other nodes Source = default_avro_config_file(NameVsn), Destination = avro_config_file(NameVsn), - filelib:is_regular(Source) andalso + maybe + true ?= filelib:is_regular(Source), + %% destination path not existed (not configured) + true ?= (not filelib:is_regular(Destination)), case file:copy(Source, Destination) of {ok, _} -> - ok, - ensure_avro_config(NameVsn); + ok; {error, Reason} -> ?SLOG(warning, #{ msg => "failed_to_copy_plugin_default_avro_config", @@ -1154,7 +1171,10 @@ cp_default_avro_file(NameVsn) -> destination => Destination, reason => Reason }) - end. + end + else + _ -> ensure_avro_config(NameVsn) + end. ensure_avro_config(NameVsn) -> with_plugin_avsc(NameVsn) andalso From 3ce091e9d775ef9c9fb0ecd705853774688f4881 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 May 2024 16:31:11 +0800 Subject: [PATCH 10/15] fix: ensure plugin tarball and extracted --- apps/emqx_plugins/src/emqx_plugins.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index ab295e082..aa693286f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -166,7 +166,7 @@ ensure_installed(NameVsn) -> _ = maybe_ensure_plugin_config(NameVsn); {error, _} -> ok = purge(NameVsn), - case do_ensure_installed(NameVsn) of + case ensure_exists_and_installed(NameVsn) of ok -> maybe_post_op_after_installed(NameVsn); _ -> @@ -303,7 +303,7 @@ get_config(Name, Vsn, Options, Default) -> {ok, plugin_config()} | {error, term()}. get_config(NameVsn) -> - get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). + get_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}). -spec get_config(name_vsn(), Options :: map()) -> {ok, avro_binary() | plugin_config()} @@ -317,8 +317,9 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> get_config(NameVsn, Options, #{}). +%% Present default config value only in map format. get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> - {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}. + {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. %% @doc Update plugin's config. %% RPC call from Management API or CLI. @@ -1133,21 +1134,24 @@ do_create_config_dir(NameVsn) -> maybe_ensure_plugin_config(NameVsn) -> maybe - true ?= filelib:is_regular(avro_config_file(NameVsn)), - ensure_avro_config(NameVsn) + true ?= with_plugin_avsc(NameVsn), + _ = do_ensure_plugin_config(NameVsn) else - _ -> do_ensure_plugin_config(NameVsn) + _ -> ok end. do_ensure_plugin_config(NameVsn) -> + %% fetch plugin avro config from cluster Nodes = [N || N <- mria:running_nodes(), N /= node()], case get_avro_config_from_any_node(Nodes, NameVsn, []) of {ok, AvroJsonMap} when is_map(AvroJsonMap) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = file:write_file(avro_config_file(NameVsn), AvroJsonBin), ensure_avro_config(NameVsn); - %% {ok, ?plugin_conf_not_found} -> _ -> + ?SLOG(warning, #{msg => "config_not_found_from_cluster"}), + %% otherwise cp default avro file + %% i.e. Clean installation cp_default_avro_file(NameVsn), ensure_avro_config(NameVsn) end. From 87b3b214b98c29ceb399fc9d24e78e06d4a7ecac Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 May 2024 18:46:41 +0800 Subject: [PATCH 11/15] fix: make static_check happy --- apps/emqx/priv/bpapi.versions | 1 + .../src/emqx_mgmt_api_plugins.erl | 23 +++++++++------- apps/emqx_plugins/src/emqx_plugins.erl | 26 +++++++++++-------- .../src/proto/emqx_plugins_proto_v1.erl | 4 --- .../src/proto/emqx_plugins_proto_v2.erl | 1 + 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 10d27fc63..7c78d43d9 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -64,6 +64,7 @@ {emqx_node_rebalance_status,2}. {emqx_persistent_session_ds,1}. {emqx_plugins,1}. +{emqx_plugins,2}. {emqx_prometheus,1}. {emqx_prometheus,2}. {emqx_resource,1}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 5835b9120..fdfbd864a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -21,6 +21,8 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_plugins/include/emqx_plugins.hrl"). +-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}). + -export([ api_spec/0, fields/1, @@ -534,7 +536,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> {error, Reason} -> {400, #{code => 'BAD_POSITION', message => Reason}}; Position -> - case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of + case emqx_plugins:ensure_enabled(Name, Position, global) of ok -> {204}; {error, Reason} -> @@ -694,16 +696,12 @@ aggregate_status([{Node, Plugins} | List], Acc) -> ), aggregate_status(List, NewAcc). +-if(?EMQX_RELEASE_EDITION == ee). format_plugin_avsc_and_i18n(NameVsn) -> - case emqx_release:edition() of - ee -> - #{ - avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), - i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) - }; - ce -> - #{avsc => null, i18n => null} - end. + #{ + avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + }. try_read_file(Fun) -> case Fun() of @@ -711,6 +709,11 @@ try_read_file(Fun) -> _ -> null end. +-else. +format_plugin_avsc_and_i18n(_NameVsn) -> + #{avsc => null, i18n => null}. +-endif. + % running_status: running loaded, stopped %% config_status: not_configured disable enable plugin_status(#{running_status := running}) -> running; diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index aa693286f..f00b663b6 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -152,8 +152,10 @@ make_name_vsn_string(Name, Vsn) -> -spec ensure_installed() -> ok. ensure_installed() -> Fun = fun(#{name_vsn := NameVsn}) -> - ensure_installed(NameVsn), - [] + case ensure_installed(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end end, ok = for_plugins(Fun). @@ -169,8 +171,8 @@ ensure_installed(NameVsn) -> case ensure_exists_and_installed(NameVsn) of ok -> maybe_post_op_after_installed(NameVsn); - _ -> - ok + {error, _Reason} = Err -> + Err end end. @@ -280,8 +282,10 @@ ensure_started(NameVsn) -> -spec ensure_stopped() -> ok. ensure_stopped() -> Fun = fun(#{name_vsn := NameVsn, enable := true}) -> - ensure_stopped(NameVsn), - [] + case ensure_stopped(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end end, ok = for_plugins(Fun). @@ -314,7 +318,7 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> {ok, _AvroJson} = Res -> Res; {error, _Reason} = Err -> Err end; -get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> +get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP} = Options) -> get_config(NameVsn, Options, #{}). %% Present default config value only in map format. @@ -714,7 +718,7 @@ ensure_exists_and_installed(NameVsn) -> case get_tar(NameVsn) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), - ok = do_ensure_installed(NameVsn); + do_ensure_installed(NameVsn); _ -> %% If not, try to get it from the cluster. do_get_from_cluster(NameVsn) @@ -733,13 +737,13 @@ do_get_from_cluster(NameVsn) -> name_vsn => NameVsn, node_errors => NodeErrors }), - {error, plugin_not_found}; + {error, #{reason => not_found, name_vsn => NameVsn}}; {error, _} -> ?SLOG(error, #{ msg => "no_nodes_to_copy_plugin_from", name_vsn => NameVsn }), - {error, plugin_not_found} + {error, #{reason => not_found, name_vsn => NameVsn}} end. get_from_any_node([], _NameVsn, Errors) -> @@ -763,7 +767,7 @@ get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> {ok, _} = Res -> Res; Err -> - get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_avro_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. plugins_readme(NameVsn, #{fill_readme := true}, Info) -> diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl index 04e06337b..bc1e31473 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl @@ -20,7 +20,6 @@ -export([ introduced_in/0, - deprecated_since/0, get_tar/3 ]). @@ -31,9 +30,6 @@ introduced_in() -> "5.0.21". -deprecated_since() -> - "5.7.0". - -spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 01d0a5ce1..68b8090a3 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -35,5 +35,6 @@ introduced_in() -> get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). +-spec get_config(node(), name_vsn(), map(), any(), timeout()) -> {ok, any()} | {error, any()}. get_config(Node, NameVsn, Opts, Default, Timeout) -> rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opts, Default], Timeout). From 5abd23af5ae393374659003bf682ea1b49927e9c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 21 May 2024 15:13:25 +0800 Subject: [PATCH 12/15] test: plugin refactor --- apps/emqx_plugins/src/emqx_plugins.erl | 48 ++++++++++++------- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 43 ++++++++++++++--- 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index f00b663b6..da5bca312 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -18,8 +18,9 @@ -feature(maybe_expr, enable). --include_lib("emqx/include/logger.hrl"). -include("emqx_plugins.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -281,11 +282,15 @@ ensure_started(NameVsn) -> %% @doc Stop all plugins before broker stops. -spec ensure_stopped() -> ok. ensure_stopped() -> - Fun = fun(#{name_vsn := NameVsn, enable := true}) -> - case ensure_stopped(NameVsn) of - ok -> []; - {error, Reason} -> [{NameVsn, Reason}] - end + Fun = fun + (#{name_vsn := NameVsn, enable := true}) -> + case ensure_stopped(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; + (#{name_vsn := NameVsn, enable := false}) -> + ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}), + [] end, ok = for_plugins(Fun). @@ -732,18 +737,22 @@ do_get_from_cluster(NameVsn) -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); {error, NodeErrors} when Nodes =/= [] -> - ?SLOG(error, #{ - msg => "failed_to_copy_plugin_from_other_nodes", + ErrMeta = #{ + error_msg => "failed_to_copy_plugin_from_other_nodes", name_vsn => NameVsn, - node_errors => NodeErrors - }), - {error, #{reason => not_found, name_vsn => NameVsn}}; + node_errors => NodeErrors, + reason => not_found + }, + ?SLOG(error, ErrMeta), + {error, ErrMeta}; {error, _} -> - ?SLOG(error, #{ - msg => "no_nodes_to_copy_plugin_from", - name_vsn => NameVsn - }), - {error, #{reason => not_found, name_vsn => NameVsn}} + ErrMeta = #{ + error_msg => "no_nodes_to_copy_plugin_from", + name_vsn => NameVsn, + reason => not_found + }, + ?SLOG(error, ErrMeta), + {error, ErrMeta} end. get_from_any_node([], _NameVsn, Errors) -> @@ -1088,7 +1097,12 @@ for_plugins(ActionFun) -> [] -> ok; Errors -> - ?SLOG(error, #{function => ActionFun, errors => Errors}), + ErrMeta = #{function => ActionFun, errors => Errors}, + ?tp( + for_plugins_action_error_occurred, + ErrMeta + ), + ?SLOG(error, ErrMeta), ok end. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 80f3d7a48..5c8c51f1e 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin). -define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)). @@ -273,9 +274,15 @@ t_start_restart_and_stop(Config) -> %% fake enable bar-2 ok = ensure_state(Bar2, rear, true), %% should cause an error - ?assertError( - #{function := _, errors := [_ | _]}, - emqx_plugins:ensure_started() + ?check_trace( + emqx_plugins:ensure_started(), + fun(Trace) -> + ?assertMatch( + [#{function := _, errors := [_ | _]}], + ?of_kind(for_plugins_action_error_occurred, Trace) + ), + ok + end ), %% but demo plugin should still be running assert_app_running(?EMQX_PLUGIN_APP_NAME, true), @@ -337,7 +344,7 @@ t_enable_disable({'end', Config}) -> t_enable_disable(Config) -> NameVsn = proplists:get_value(name_vsn, Config), ok = emqx_plugins:ensure_installed(NameVsn), - ?assertEqual([], emqx_plugins:configured()), + ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()), ok = emqx_plugins:ensure_enabled(NameVsn), ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()), ok = emqx_plugins:ensure_disabled(NameVsn), @@ -379,9 +386,10 @@ t_bad_tar_gz(Config) -> }}, emqx_plugins:ensure_installed("fake-vsn") ), + %% the plugin tarball can not be found on any nodes ?assertMatch( {error, #{ - error_msg := "failed_to_extract_plugin_package", + error_msg := "no_nodes_to_copy_plugin_from", reason := not_found }}, emqx_plugins:ensure_installed("nonexisting") @@ -556,7 +564,7 @@ t_load_config_from_cli({'end', Config}) -> t_load_config_from_cli(Config) when is_list(Config) -> NameVsn = ?config(name_vsn, Config), ok = emqx_plugins:ensure_installed(NameVsn), - ?assertEqual([], emqx_plugins:configured()), + ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()), ok = emqx_plugins:ensure_enabled(NameVsn), ok = emqx_plugins:ensure_started(NameVsn), Params0 = unused, @@ -687,6 +695,14 @@ group_t_copy_plugin_to_a_new_node(Config) -> %% see: emqx_conf_app:init_conf/0 ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]), {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]), + + %% Plugin config should be synced from `CopyFromNode` + %% by application `emqx` and `emqx_conf` + %% FIXME: in test case, we manually do it here + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []), + ?assertMatch( {ok, #{running_status := running, config_status := enabled}}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn]) @@ -739,6 +755,16 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) -> ct:pal("~p install_dir:\n ~p", [ CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir]) ]), + + %% Plugin config should be synced from `CopyFromNode` + %% by application `emqx` and `emqx_conf` + %% FIXME: in test case, we manually do it here + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [ + [states], [#{enable => true, name_vsn => NameVsn}] + ]), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []), + ?assertMatch( {ok, #{running_status := running, config_status := enabled}}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn]) @@ -785,6 +811,11 @@ group_t_cluster_leave(Config) -> ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]), ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]), ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]), + + ok = erpc:call(N2, emqx_plugins, ensure_installed, [NameVsn]), + ok = erpc:call(N2, emqx_plugins, ensure_started, [NameVsn]), + ok = erpc:call(N2, emqx_plugins, ensure_enabled, [NameVsn]), + Params = unused, %% 2 nodes running ?assertMatch( From 14f2a687996730cffa138fb794b01a7a65a0698f Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 22 May 2024 03:05:31 +0800 Subject: [PATCH 13/15] fix: bpapi spec type --- .../src/emqx_mgmt_api_plugins.erl | 11 +++--- apps/emqx_plugins/src/emqx_plugins.erl | 36 +++++++++++-------- .../src/proto/emqx_plugins_proto_v2.erl | 11 +++--- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index fdfbd864a..ca0d3ac10 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -180,6 +180,9 @@ schema("/plugins/:name/config") -> responses => #{ %% avro data, json encoded 200 => hoconsc:mk(binary()), + 400 => emqx_dashboard_swagger:error_codes( + ['BAD_CONFIG'], <<"Plugin Config Not Found">> + ), 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } }, @@ -490,13 +493,13 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> plugin_config(get, #{bindings := #{name := NameVsn}}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:get_config(NameVsn) of - {ok, AvroJson} -> + case emqx_plugins:get_config(NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found) of + {ok, AvroJson} when is_map(AvroJson) -> {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; - {error, _} -> + {ok, ?plugin_conf_not_found} -> {400, #{ code => 'BAD_CONFIG', - message => <<"Failed to get plugin config">> + message => <<"Plugin Config Not Found">> }} end; _ -> diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index da5bca312..11ef113b0 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -305,30 +305,36 @@ ensure_stopped(NameVsn) -> end ). -get_config(Name, Vsn, Options, Default) -> - get_config(make_name_vsn_string(Name, Vsn), Options, Default). +get_config(Name, Vsn, Opt, Default) -> + get_config(make_name_vsn_string(Name, Vsn), Opt, Default). -spec get_config(name_vsn()) -> - {ok, plugin_config()} + {ok, plugin_config() | any()} | {error, term()}. get_config(NameVsn) -> - get_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}). + get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}). --spec get_config(name_vsn(), Options :: map()) -> - {ok, avro_binary() | plugin_config()} +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_AVRO) -> + {ok, avro_binary() | plugin_config() | any()} | {error, term()}. -get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> +get_config(NameVsn, ?CONFIG_FORMAT_MAP) -> + get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}); +get_config(NameVsn, ?CONFIG_FORMAT_AVRO) -> + get_config_bin(NameVsn). + +%% Present default config value only in map format. +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) -> + {ok, plugin_config() | any()} + | {error, term()}. +get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) -> + {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. + +get_config_bin(NameVsn) -> %% no default value when get raw binary config case read_plugin_avro(NameVsn) of {ok, _AvroJson} = Res -> Res; {error, _Reason} = Err -> Err - end; -get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP} = Options) -> - get_config(NameVsn, Options, #{}). - -%% Present default config value only in map format. -get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> - {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. + end. %% @doc Update plugin's config. %% RPC call from Management API or CLI. @@ -770,7 +776,7 @@ get_avro_config_from_any_node([], _NameVsn, Errors) -> get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v2:get_config( - Node, NameVsn, #{format => ?CONFIG_FORMAT_MAP}, ?plugin_conf_not_found, 5_000 + Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000 ) of {ok, _} = Res -> diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 68b8090a3..0e933d351 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -24,6 +24,7 @@ get_config/5 ]). +-include("emqx_plugins.hrl"). -include_lib("emqx/include/bpapi.hrl"). -type name_vsn() :: binary() | string(). @@ -31,10 +32,12 @@ introduced_in() -> "5.7.0". --spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. +-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any()}. get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). --spec get_config(node(), name_vsn(), map(), any(), timeout()) -> {ok, any()} | {error, any()}. -get_config(Node, NameVsn, Opts, Default, Timeout) -> - rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opts, Default], Timeout). +-spec get_config( + node(), name_vsn(), ?CONFIG_FORMAT_MAP, any(), timeout() +) -> {ok, map() | any()} | {error, any()}. +get_config(Node, NameVsn, Opt, Default, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opt, Default], Timeout). From 33aa61daeab671c69a9a570b8339b6c3a21cdda9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 22 May 2024 06:37:58 +0800 Subject: [PATCH 14/15] fix: use hocon format as plugin config --- apps/emqx_plugins/src/emqx_plugins.erl | 46 ++++++++++++-------- apps/emqx_plugins/src/emqx_plugins_serde.erl | 4 +- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 11ef113b0..1a8a679c6 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -342,8 +342,8 @@ get_config_bin(NameVsn) -> put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig); put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), + HoconBin = hocon_pp:do(AvroJsonMap, #{}), + ok = backup_and_write_avro_bin(NameVsn, HoconBin), %% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), @@ -717,7 +717,7 @@ read_plugin_avro(NameVsn) -> read_plugin_avro(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options) + read_file_fun(plugin_config_file(NameVsn), "bad_avro_file", Options) ). ensure_exists_and_installed(NameVsn) -> @@ -1170,21 +1170,21 @@ do_ensure_plugin_config(NameVsn) -> case get_avro_config_from_any_node(Nodes, NameVsn, []) of {ok, AvroJsonMap} when is_map(AvroJsonMap) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = file:write_file(avro_config_file(NameVsn), AvroJsonBin), + ok = file:write_file(plugin_config_file(NameVsn), AvroJsonBin), ensure_avro_config(NameVsn); _ -> - ?SLOG(warning, #{msg => "config_not_found_from_cluster"}), + ?SLOG(info, #{msg => "config_not_found_from_cluster"}), %% otherwise cp default avro file %% i.e. Clean installation - cp_default_avro_file(NameVsn), + cp_default_config_file(NameVsn), ensure_avro_config(NameVsn) end. -cp_default_avro_file(NameVsn) -> +cp_default_config_file(NameVsn) -> %% always copy default avro file into config dir %% when can not get config from other nodes - Source = default_avro_config_file(NameVsn), - Destination = avro_config_file(NameVsn), + Source = default_plugin_config_file(NameVsn), + Destination = plugin_config_file(NameVsn), maybe true ?= filelib:is_regular(Source), %% destination path not existed (not configured) @@ -1224,7 +1224,7 @@ do_ensure_avro_config(NameVsn) -> backup_and_write_avro_bin(NameVsn, AvroBin) -> %% this may fail, but we don't care %% e.g. read-only file system - Path = avro_config_file(NameVsn), + Path = plugin_config_file(NameVsn), _ = filelib:ensure_dir(Path), TmpFile = Path ++ ".tmp", case file:write_file(TmpFile, AvroBin) of @@ -1309,6 +1309,16 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> plugin_dir(NameVsn) -> wrap_list_path(filename:join([install_dir(), NameVsn])). +-spec plugin_priv_dir(name_vsn()) -> string(). +plugin_priv_dir(NameVsn) -> + case read_plugin_info(NameVsn, #{fill_readme => false}) of + {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> + AppDir = <>, + wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); + _ -> + wrap_list_path(filename:join([install_dir(), NameVsn, "priv"])) + end. + -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. plugin_config_dir(NameVsn) -> case parse_name_vsn(NameVsn) of @@ -1334,20 +1344,20 @@ info_file_path(NameVsn) -> -spec avsc_file_path(name_vsn()) -> string(). avsc_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config_schema.avsc"])). + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])). --spec avro_config_file(name_vsn()) -> string(). -avro_config_file(NameVsn) -> - wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])). +-spec plugin_config_file(name_vsn()) -> string(). +plugin_config_file(NameVsn) -> + wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.hocon"])). %% should only used when plugin installing --spec default_avro_config_file(name_vsn()) -> string(). -default_avro_config_file(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config.avro"])). +-spec default_plugin_config_file(name_vsn()) -> string(). +default_plugin_config_file(NameVsn) -> + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])). -spec i18n_file_path(name_vsn()) -> string(). i18n_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config", "config_i18n.json"])). + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])). -spec readme_file(name_vsn()) -> string(). readme_file(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index 7328c33ff..b76f71991 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -151,10 +151,10 @@ terminate(_Reason, _State) -> -spec get_plugin_avscs() -> [{string(), string()}]. get_plugin_avscs() -> - Pattern = filename:join([emqx_plugins:install_dir(), "*", "config", "config_schema.avsc"]), + Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]), lists:foldl( fun(AvscPath, AccIn) -> - [_, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + [_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)), [{to_bin(NameVsn), AvscPath} | AccIn] end, _Acc0 = [], From e5f7aa981738977e1b28d7a0dcf6863ae596b6a4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 22 May 2024 11:08:55 +0800 Subject: [PATCH 15/15] refactor: plguin functions and types rename --- .../src/emqx_mgmt_api_plugins.erl | 6 +- apps/emqx_plugins/include/emqx_plugins.hrl | 12 +- apps/emqx_plugins/src/emqx_plugins.erl | 160 +++++++++--------- .../src/proto/emqx_plugins_proto_v2.erl | 2 - 4 files changed, 91 insertions(+), 89 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index ca0d3ac10..ed5f016b3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -508,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) -> plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of + case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of {ok, AvroValueConfig} -> Nodes = emqx:running_nodes(), %% cluster call with config in map (binary key-value) @@ -702,8 +702,8 @@ aggregate_status([{Node, Plugins} | List], Acc) -> -if(?EMQX_RELEASE_EDITION == ee). format_plugin_avsc_and_i18n(NameVsn) -> #{ - avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), - i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end) }. try_read_file(Fun) -> diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index d1411884f..66f23d4b0 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -21,7 +21,7 @@ -define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab). --define(CONFIG_FORMAT_AVRO, config_format_avro). +-define(CONFIG_FORMAT_BIN, config_format_bin). -define(CONFIG_FORMAT_MAP, config_format_map). -define(plugin_conf_not_found, plugin_conf_not_found). @@ -32,6 +32,16 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). +%% "my_plugin-0.1.0" +-type name_vsn() :: binary() | string(). +%% the parse result of the JSON info file +-type plugin_info() :: map(). +-type schema_json_map() :: map(). +-type i18n_json_map() :: map(). +-type raw_plugin_config_content() :: binary(). +-type plugin_config_map() :: map(). +-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. + -record(plugin_schema_serde, { name :: schema_name(), eval_context :: term(), diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 1a8a679c6..ae97e7f5f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -28,9 +28,9 @@ -export([ describe/1, - plugin_avsc/1, - plugin_i18n/1, - plugin_avro/1, + plugin_schema_json/1, + plugin_i18n_json/1, + raw_plugin_config_content/1, parse_name_vsn/1, make_name_vsn_string/2 ]). @@ -69,7 +69,7 @@ %% Package utils -export([ - decode_plugin_avro_config/2, + decode_plugin_config_map/2, install_dir/0, avsc_file_path/1, with_plugin_avsc/1 @@ -85,7 +85,7 @@ %% Internal export -export([ - ensure_avro_config/1, + ensure_config_map/1, do_ensure_started/1 ]). %% for test cases @@ -104,36 +104,26 @@ -define(MAX_KEEP_BACKUP_CONFIGS, 10). -%% "my_plugin-0.1.0" --type name_vsn() :: binary() | string(). -%% the parse result of the JSON info file --type plugin() :: map(). --type schema_json() :: map(). --type i18n_json() :: map(). --type avro_binary() :: binary(). --type plugin_config() :: map(). --type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Describe a plugin. --spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. +-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}. describe(NameVsn) -> read_plugin_info(NameVsn, #{fill_readme => true}). --spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}. -plugin_avsc(NameVsn) -> +-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}. +plugin_schema_json(NameVsn) -> read_plugin_avsc(NameVsn). --spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}. -plugin_i18n(NameVsn) -> +-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}. +plugin_i18n_json(NameVsn) -> read_plugin_i18n(NameVsn). --spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}. -plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn). +-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}. +raw_plugin_config_content(NameVsn) -> + read_plugin_hocon(NameVsn). parse_name_vsn(NameVsn) when is_binary(NameVsn) -> parse_name_vsn(binary_to_list(NameVsn)); @@ -309,46 +299,44 @@ get_config(Name, Vsn, Opt, Default) -> get_config(make_name_vsn_string(Name, Vsn), Opt, Default). -spec get_config(name_vsn()) -> - {ok, plugin_config() | any()} + {ok, plugin_config_map() | any()} | {error, term()}. get_config(NameVsn) -> get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}). --spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_AVRO) -> - {ok, avro_binary() | plugin_config() | any()} +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) -> + {ok, raw_plugin_config_content() | plugin_config_map() | any()} | {error, term()}. get_config(NameVsn, ?CONFIG_FORMAT_MAP) -> get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}); -get_config(NameVsn, ?CONFIG_FORMAT_AVRO) -> +get_config(NameVsn, ?CONFIG_FORMAT_BIN) -> get_config_bin(NameVsn). %% Present default config value only in map format. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) -> - {ok, plugin_config() | any()} + {ok, plugin_config_map() | any()} | {error, term()}. get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) -> {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. get_config_bin(NameVsn) -> %% no default value when get raw binary config - case read_plugin_avro(NameVsn) of - {ok, _AvroJson} = Res -> Res; + case read_plugin_hocon(NameVsn) of + {ok, _Map} = Res -> Res; {error, _Reason} = Err -> Err end. %% @doc Update plugin's config. %% RPC call from Management API or CLI. -%% the avro Json Map and plugin config ALWAYS be valid before calling this function. -put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> - put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig); -put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> - HoconBin = hocon_pp:do(AvroJsonMap, #{}), - ok = backup_and_write_avro_bin(NameVsn, HoconBin), +%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> + put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig); +put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) -> + HoconBin = hocon_pp:do(ConfigJsonMap, #{}), + ok = backup_and_write_hocon_bin(NameVsn, HoconBin), %% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) - %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), - %% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap), - ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap), ok. %% @doc Stop and then start the plugin. @@ -360,7 +348,7 @@ restart(NameVsn) -> %% @doc List all installed plugins. %% Including the ones that are installed, but not enabled in config. --spec list() -> [plugin()]. +-spec list() -> [plugin_info()]. list() -> Pattern = filename:join([install_dir(), "*", "release.json"]), All = lists:filtermap( @@ -381,10 +369,10 @@ list() -> %%-------------------------------------------------------------------- %% Package utils --spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. -decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> - decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap)); -decode_plugin_avro_config(NameVsn, AvroJsonBin) -> +-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. +decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> + decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap)); +decode_plugin_config_map(NameVsn, AvroJsonBin) -> case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of {ok, Config} -> {ok, Config}; {error, ReasonMap} -> {error, ReasonMap} @@ -712,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) -> read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options) ). -read_plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}). -read_plugin_avro(NameVsn, Options) -> +read_plugin_hocon(NameVsn) -> + read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}). +read_plugin_hocon(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(plugin_config_file(NameVsn), "bad_avro_file", Options) + read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options) ). ensure_exists_and_installed(NameVsn) -> @@ -738,7 +726,7 @@ ensure_exists_and_installed(NameVsn) -> do_get_from_cluster(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_from_any_node(Nodes, NameVsn, []) of + case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); @@ -761,19 +749,19 @@ do_get_from_cluster(NameVsn) -> {error, ErrMeta} end. -get_from_any_node([], _NameVsn, Errors) -> +get_plugin_tar_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_from_any_node([Node | T], NameVsn, Errors) -> +get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of {ok, _} = Res -> Res; Err -> - get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. -get_avro_config_from_any_node([], _NameVsn, Errors) -> +get_plugin_config_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> +get_plugin_config_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v2:get_config( Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000 @@ -782,7 +770,7 @@ get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> {ok, _} = Res -> Res; Err -> - get_avro_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. plugins_readme(NameVsn, #{fill_readme := true}, Info) -> @@ -1159,30 +1147,37 @@ do_create_config_dir(NameVsn) -> maybe_ensure_plugin_config(NameVsn) -> maybe true ?= with_plugin_avsc(NameVsn), - _ = do_ensure_plugin_config(NameVsn) + _ = ensure_plugin_config(NameVsn) else _ -> ok end. -do_ensure_plugin_config(NameVsn) -> - %% fetch plugin avro config from cluster +ensure_plugin_config(NameVsn) -> + %% fetch plugin hocon config from cluster Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_avro_config_from_any_node(Nodes, NameVsn, []) of - {ok, AvroJsonMap} when is_map(AvroJsonMap) -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = file:write_file(plugin_config_file(NameVsn), AvroJsonBin), - ensure_avro_config(NameVsn); + ensure_plugin_config(NameVsn, Nodes). +ensure_plugin_config(NameVsn, []) -> + ?SLOG(debug, #{ + msg => "default_plugin_config_used", + name_vsn => NameVsn, + reason => "no_other_running_nodes" + }), + cp_default_config_file(NameVsn); +ensure_plugin_config(NameVsn, Nodes) -> + case get_plugin_config_from_any_node(Nodes, NameVsn, []) of + {ok, ConfigMap} when is_map(ConfigMap) -> + HoconBin = hocon_pp:do(ConfigMap, #{}), + ok = file:write_file(plugin_config_file(NameVsn), HoconBin), + ensure_config_map(NameVsn); _ -> - ?SLOG(info, #{msg => "config_not_found_from_cluster"}), - %% otherwise cp default avro file + ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}), + %% otherwise cp default hocon file %% i.e. Clean installation - cp_default_config_file(NameVsn), - ensure_avro_config(NameVsn) + cp_default_config_file(NameVsn) end. cp_default_config_file(NameVsn) -> - %% always copy default avro file into config dir - %% when can not get config from other nodes + %% always copy default hocon file into config dir when can not get config from other nodes Source = default_plugin_config_file(NameVsn), Destination = plugin_config_file(NameVsn), maybe @@ -1194,40 +1189,39 @@ cp_default_config_file(NameVsn) -> ok; {error, Reason} -> ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", + msg => "failed_to_copy_plugin_default_hocon_config", source => Source, destination => Destination, reason => Reason }) end else - _ -> ensure_avro_config(NameVsn) + _ -> ensure_config_map(NameVsn) end. -ensure_avro_config(NameVsn) -> +ensure_config_map(NameVsn) -> with_plugin_avsc(NameVsn) andalso - do_ensure_avro_config(NameVsn). + do_ensure_config_map(NameVsn). -do_ensure_avro_config(NameVsn) -> - case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of - {ok, AvroJsonMap} -> - {ok, Config} = decode_plugin_avro_config( - NameVsn, emqx_utils_json:encode(AvroJsonMap) - ), - put_config(NameVsn, AvroJsonMap, Config); +do_ensure_config_map(NameVsn) -> + case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of + {ok, ConfigJsonMap} -> + {ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap), + put_config(NameVsn, ConfigJsonMap, Config); _ -> + ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}), ok end. %% @private Backup the current config to a file with a timestamp suffix and %% then save the new config to the config file. -backup_and_write_avro_bin(NameVsn, AvroBin) -> +backup_and_write_hocon_bin(NameVsn, HoconBin) -> %% this may fail, but we don't care %% e.g. read-only file system Path = plugin_config_file(NameVsn), _ = filelib:ensure_dir(Path), TmpFile = Path ++ ".tmp", - case file:write_file(TmpFile, AvroBin) of + case file:write_file(TmpFile, HoconBin) of ok -> backup_and_replace(Path, TmpFile); {error, Reason} -> @@ -1313,7 +1307,7 @@ plugin_dir(NameVsn) -> plugin_priv_dir(NameVsn) -> case read_plugin_info(NameVsn, #{fill_readme => false}) of {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> - AppDir = <>, + AppDir = make_name_vsn_string(Name, Vsn), wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); _ -> wrap_list_path(filename:join([install_dir(), NameVsn, "priv"])) diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 0e933d351..fa91a8512 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -27,8 +27,6 @@ -include("emqx_plugins.hrl"). -include_lib("emqx/include/bpapi.hrl"). --type name_vsn() :: binary() | string(). - introduced_in() -> "5.7.0".