From 71cdcc860aba12e4ab95e30e1d4f8b53999fda92 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 17 Apr 2024 16:22:08 +0800 Subject: [PATCH 01/22] fix(plugin): plugin's mgmt api schema codes --- apps/emqx_management/src/emqx_mgmt_api_plugins.erl | 14 +++++++------- .../test/emqx_mgmt_api_plugins_SUITE.erl | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 9e05d39a7..49905540d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -105,7 +105,7 @@ schema("/plugins/install") -> } }, responses => #{ - 200 => <<"OK">>, + 204 => <<"Install plugin successfully">>, 400 => emqx_dashboard_swagger:error_codes( ['UNEXPECTED_ERROR', 'ALREADY_INSTALLED', 'BAD_PLUGIN_INFO'] ) @@ -117,7 +117,7 @@ schema("/plugins/:name") -> 'operationId' => plugin, get => #{ summary => <<"Get a plugin description">>, - description => "Describs plugin according to its `release.json` and `README.md`.", + description => "Describe a plugin according to its `release.json` and `README.md`.", tags => ?TAGS, parameters => [hoconsc:ref(name)], responses => #{ @@ -152,7 +152,7 @@ schema("/plugins/:name/:action") -> {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})} ], responses => #{ - 200 => <<"OK">>, + 204 => <<"Trigger action successfully">>, 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } } @@ -161,13 +161,13 @@ schema("/plugins/:name/move") -> #{ 'operationId' => update_boot_order, post => #{ - summary => <<"Move plugin within plugin hiearchy">>, + summary => <<"Move plugin within plugin hierarchy">>, description => "Setting the boot order of plugins.", tags => ?TAGS, parameters => [hoconsc:ref(name)], 'requestBody' => move_request_body(), responses => #{ - 200 => <<"OK">>, + 204 => <<"Boot order changed successfully">>, 400 => emqx_dashboard_swagger:error_codes(['MOVE_FAILED'], <<"Move failed">>) } } @@ -382,7 +382,7 @@ do_install_package(FileName, Bin) -> {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin), case lists:filter(fun(R) -> R =/= ok end, Res) of [] -> - {200}; + {204}; Filtered -> %% crash if we have unexpected errors or results [] = lists:filter( @@ -425,7 +425,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> Position -> case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of ok -> - {200}; + {204}; {error, Reason} -> {400, #{ code => 'MOVE_FAILED', diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl index 8106afc4a..4e5dacc7a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl @@ -271,7 +271,7 @@ install_plugin(FilePath) -> Token ) of - {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok; + {ok, {{"HTTP/1.1", 204, "No Content"}, _Headers, <<>>}} -> ok; Error -> Error end. @@ -288,7 +288,7 @@ install_plugin(Config, FilePath) -> Auth ) of - {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok; + {ok, {{"HTTP/1.1", 204, "No Content"}, _Headers, <<>>}} -> ok; Error -> Error end. From 8db5e515926184fa656008ded224cec71c76c2dd Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 19 Apr 2024 16:14:59 +0800 Subject: [PATCH 02/22] feat: plugin config with avro schema and apis --- apps/emqx/priv/bpapi.versions | 1 + .../src/emqx_mgmt_api_plugins.erl | 162 +++- .../proto/emqx_mgmt_api_plugins_proto_v2.erl | 1 + .../proto/emqx_mgmt_api_plugins_proto_v3.erl | 65 ++ apps/emqx_plugins/include/emqx_plugins.hrl | 21 + apps/emqx_plugins/src/emqx_plugins.app.src | 2 +- apps/emqx_plugins/src/emqx_plugins.erl | 725 +++++++++++------- apps/emqx_plugins/src/emqx_plugins_serde.erl | 274 +++++++ apps/emqx_plugins/src/emqx_plugins_sup.erl | 12 +- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 24 +- apps/emqx_plugins/test/emqx_plugins_tests.erl | 4 +- 11 files changed, 980 insertions(+), 311 deletions(-) create mode 100644 apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl create mode 100644 apps/emqx_plugins/src/emqx_plugins_serde.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index a2913c37a..10d27fc63 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -46,6 +46,7 @@ {emqx_metrics,2}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. +{emqx_mgmt_api_plugins,3}. {emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,2}. {emqx_mgmt_cluster,3}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 49905540d..63a8ba517 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -19,7 +19,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -%%-include_lib("emqx_plugins/include/emqx_plugins.hrl"). +-include_lib("emqx_plugins/include/emqx_plugins.hrl"). -export([ api_spec/0, @@ -34,6 +34,8 @@ upload_install/2, plugin/2, update_plugin/2, + plugin_config/2, + plugin_schema/2, update_boot_order/2 ]). @@ -43,7 +45,8 @@ install_package/2, delete_package/1, describe_package/1, - ensure_action/2 + ensure_action/2, + do_update_plugin_config/3 ]). -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$"). @@ -52,7 +55,11 @@ %% app_name must be a snake_case (no '-' allowed). -define(VSN_WILDCARD, "-*.tar.gz"). -namespace() -> "plugins". +-define(CONTENT_PLUGIN, plugin). +-define(CONTENT_CONFIG, config). + +namespace() -> + "plugins". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -64,6 +71,8 @@ paths() -> "/plugins/:name", "/plugins/install", "/plugins/:name/:action", + "/plugins/:name/config", + "/plugins/:name/schema", "/plugins/:name/move" ]. @@ -97,10 +106,10 @@ schema("/plugins/install") -> schema => #{ type => object, properties => #{ - plugin => #{type => string, format => binary} + ?CONTENT_PLUGIN => #{type => string, format => binary} } }, - encoding => #{plugin => #{'contentType' => 'application/gzip'}} + encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}} } } }, @@ -157,6 +166,70 @@ schema("/plugins/:name/:action") -> } } }; +schema("/plugins/:name/config") -> + #{ + 'operationId' => plugin_config, + get => #{ + summary => + <<"Get plugin config">>, + description => + "Get plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + responses => #{ + %% binary avro encoded config + 200 => hoconsc:mk(binary()), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + }, + put => #{ + summary => + <<"Update plugin config">>, + description => + "Update plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + 'requestBody' => #{ + content => #{ + 'multipart/form-data' => #{ + schema => #{ + type => object, + properties => #{ + ?CONTENT_CONFIG => #{type => string, format => binary} + } + }, + encoding => #{?CONTENT_CONFIG => #{'contentType' => 'application/gzip'}} + } + } + }, + responses => #{ + 204 => <<"Config updated successfully">>, + 400 => emqx_dashboard_swagger:error_codes( + ['UNEXPECTED_ERROR'], <<"Update plugin config failed">> + ), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + } + }; +schema("/plugins/:name/schema") -> + #{ + 'operationId' => plugin_schema, + get => #{ + summary => <<"Get installed plugin's avro schema">>, + description => + "Get plugin's config avro schema.", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + responses => #{ + %% avro schema and i18n json object + 200 => hoconsc:mk(binary()), + 404 => emqx_dashboard_swagger:error_codes( + ['NOT_FOUND', 'FILE_NOT_EXISTED'], + <<"Plugin Not Found or Plugin not given a schema file">> + ) + } + } + }; schema("/plugins/:name/move") -> #{ 'operationId' => update_boot_order, @@ -338,7 +411,7 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) - %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall NameVsn = string:trim(FileName, trailing, ".tar.gz"), case emqx_plugins:describe(NameVsn) of - {error, #{error := "bad_info_file", return := {enoent, _}}} -> + {error, #{error_msg := "bad_info_file", reason := {enoent, _}}} -> case emqx_plugins:parse_name_vsn(FileName) of {ok, AppName, _Vsn} -> AppDir = filename:join(emqx_plugins:install_dir(), AppName), @@ -394,7 +467,7 @@ do_install_package(FileName, Bin) -> ), Reason = case hd(Filtered) of - {error, #{error := Reason0}} -> Reason0; + {error, #{error_msg := Reason0}} -> Reason0; {error, #{reason := Reason0}} -> Reason0 end, {400, #{ @@ -418,6 +491,42 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), return(204, Res). +plugin_config(get, #{bindings := #{name := Name}}) -> + case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_AVRO}) of + {ok, AvroBin} -> + {200, #{<<"content-type">> => <<"application/octet-stream">>}, AvroBin}; + {error, _} -> + {400, #{ + code => 'BAD_CONFIG', + message => <<"Failed to get plugin config">> + }} + end; +plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawAvro}}) -> + case emqx_plugins:decode_plugin_avro_config(Name, RawAvro) of + {ok, Config} -> + Nodes = emqx:running_nodes(), + _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( + Nodes, Name, RawAvro, Config + ), + {204}; + {error, Reason} -> + {400, #{ + code => 'BAD_CONFIG', + message => readable_error_msg(Reason) + }} + end. + +plugin_schema(get, #{bindings := #{name := NameVsn}}) -> + case emqx_plugins:describe(NameVsn) of + {ok, _Plugin} -> + {200, format_plugin_schema_with_i18n(NameVsn)}; + _ -> + {404, #{ + code => 'NOT_FOUND', + message => <<"Plugin Not Found">> + }} + end. + update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> case parse_position(Body, Name) of {error, Reason} -> @@ -429,7 +538,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> {error, Reason} -> {400, #{ code => 'MOVE_FAILED', - message => iolist_to_binary(io_lib:format("~p", [Reason])) + message => readable_error_msg(Reason) }} end end. @@ -443,7 +552,7 @@ install_package(FileName, Bin) -> ok = file:write_file(File, Bin), PackageName = string:trim(FileName, trailing, ".tar.gz"), case emqx_plugins:ensure_installed(PackageName) of - {error, #{return := not_found}} = NotFound -> + {error, #{reason := not_found}} = NotFound -> NotFound; {error, Reason} = Error -> ?SLOG(error, Reason#{msg => "failed_to_install_plugin"}), @@ -454,9 +563,9 @@ install_package(FileName, Bin) -> end. %% For RPC plugin get -describe_package(Name) -> +describe_package(NameVsn) -> Node = node(), - case emqx_plugins:describe(Name) of + case emqx_plugins:describe(NameVsn) of {ok, Plugin} -> {Node, [Plugin]}; _ -> {Node, []} end. @@ -487,12 +596,25 @@ ensure_action(Name, restart) -> _ = emqx_plugins:restart(Name), ok. +%% for RPC plugin avro encoded config update +do_update_plugin_config(Name, Avro, PluginConfig) -> + emqx_plugins:put_plugin_config(Name, Avro, PluginConfig). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + return(Code, ok) -> {Code}; -return(_, {error, #{error := "bad_info_file", return := {enoent, _} = Reason}}) -> - {404, #{code => 'NOT_FOUND', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}; +return(_, {error, #{error_msg := "bad_info_file", reason := {enoent, _} = Reason}}) -> + {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}}; +return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = Reason}}) -> + {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}}; return(_, {error, Reason}) -> - {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. + {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}. + +readable_error_msg(Msg) -> + emqx_utils:readable_error_msg(Msg). parse_position(#{<<"position">> := <<"front">>}, _) -> front; @@ -563,6 +685,18 @@ aggregate_status([{Node, Plugins} | List], Acc) -> ), aggregate_status(List, NewAcc). +format_plugin_schema_with_i18n(NameVsn) -> + #{ + avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + }. + +try_read_file(Fun) -> + case Fun() of + {ok, Bin} -> Bin; + _ -> null + end. + % running_status: running loaded, stopped %% config_status: not_configured disable enable plugin_status(#{running_status := running}) -> running; diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl index 781096af0..91d4674f9 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl @@ -24,6 +24,7 @@ describe_package/2, delete_package/1, ensure_action/2 + %% plugin_config/2 ]). -include_lib("emqx/include/bpapi.hrl"). diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl new file mode 100644 index 000000000..13c13bae7 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_plugins/1, + install_package/3, + describe_package/2, + delete_package/1, + ensure_action/2, + update_plugin_config/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec get_plugins([node()]) -> emqx_rpc:multicall_result(). +get_plugins(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000). + +-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result(). +install_package(Nodes, Filename, Bin) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000). + +-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result(). +describe_package(Nodes, Name) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000). + +-spec delete_package(binary() | string()) -> ok | {error, any()}. +delete_package(Name) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). + +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}. +ensure_action(Name, Action) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). + +-spec update_plugin_config( + [node()], + binary() | string(), + binary(), + map() +) -> + emqx_rpc:multicall_result(). +update_plugin_config(Nodes, Name, RawAvro, PluginConfig) -> + rpc:multicall( + Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, [Name, RawAvro, PluginConfig], 10000 + ). diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index 05959e46f..95dc50e4f 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -19,4 +19,25 @@ -define(CONF_ROOT, plugins). +-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab). + +-define(CONFIG_FORMAT_AVRO, config_format_avro). +-define(CONFIG_FORMAT_MAP, config_format_map). + +-type schema_name() :: binary(). +-type avsc() :: binary(). + +-type encoded_data() :: iodata(). +-type decoded_data() :: map(). + +-record(plugin_schema_serde, { + name :: schema_name(), + eval_context :: term(), + %% TODO: fields to mark schema import status + %% scheam_imported :: boolean(), + %% for future use + extra = [] +}). +-type plugin_schema_serde() :: #plugin_schema_serde{}. + -endif. diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index b26836475..0e0945c7f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.1.8"}, + {vsn, "0.2.0"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index ebe5b7932..ce66fc94d 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -16,7 +16,6 @@ -module(emqx_plugins). --include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl"). -include("emqx_plugins.hrl"). @@ -24,6 +23,15 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +-export([ + describe/1, + plugin_avsc/1, + plugin_i18n/1, + plugin_avro/1, + parse_name_vsn/1 +]). + +%% Package operations -export([ ensure_installed/1, ensure_uninstalled/1, @@ -35,21 +43,26 @@ delete_package/1 ]). +%% Plugin runtime management -export([ ensure_started/0, ensure_started/1, ensure_stopped/0, ensure_stopped/1, + get_plugin_config/1, + get_plugin_config/2, + put_plugin_config/3, restart/1, - list/0, - describe/1, - parse_name_vsn/1 + list/0 ]). +%% Package utils -export([ + decode_plugin_avro_config/2, get_config/2, put_config/2, - get_tar/1 + get_tar/1, + install_dir/0 ]). %% `emqx_config_handler' API @@ -57,21 +70,26 @@ post_config_update/5 ]). -%% internal +%% Internal export -export([do_ensure_started/1]). --export([ - install_dir/0 -]). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. +%% Defines +-define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}). + +%% Types %% "my_plugin-0.1.0" -type name_vsn() :: binary() | string(). %% the parse result of the JSON info file -type plugin() :: map(). +-type schema_json() :: map(). +-type i18n_json() :: map(). +-type avro_binary() :: binary(). +-type plugin_config() :: map(). -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. %%-------------------------------------------------------------------- @@ -80,12 +98,36 @@ %% @doc Describe a plugin. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. -describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}). +describe(NameVsn) -> + read_plugin_info(NameVsn, #{fill_readme => true}). + +-spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}. +plugin_avsc(NameVsn) -> + read_plugin_avsc(NameVsn). + +-spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}. +plugin_i18n(NameVsn) -> + read_plugin_i18n(NameVsn). + +-spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}. +plugin_avro(NameVsn) -> + read_plugin_avro(NameVsn). + +parse_name_vsn(NameVsn) when is_binary(NameVsn) -> + parse_name_vsn(binary_to_list(NameVsn)); +parse_name_vsn(NameVsn) when is_list(NameVsn) -> + case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of + {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn}; + _ -> {error, "bad_name_vsn"} + end. + +%%-------------------------------------------------------------------- +%% Package operations %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, map()}. ensure_installed(NameVsn) -> - case read_plugin(NameVsn, #{}) of + case read_plugin_info(NameVsn, #{}) of {ok, _} -> ok; {error, _} -> @@ -93,33 +135,183 @@ ensure_installed(NameVsn) -> do_ensure_installed(NameVsn) end. -do_ensure_installed(NameVsn) -> - TarGz = pkg_file(NameVsn), - case erl_tar:extract(TarGz, [compressed, memory]) of - {ok, TarContent} -> - ok = write_tar_file_content(install_dir(), TarContent), - case read_plugin(NameVsn, #{}) of - {ok, _} -> - ok; - {error, Reason} -> - ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), - ok = delete_tar_file_content(install_dir(), TarContent), - {error, Reason} - end; - {error, {_, enoent}} -> +%% @doc Ensure files and directories for the given plugin are being deleted. +%% If a plugin is running, or enabled, an error is returned. +-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. +ensure_uninstalled(NameVsn) -> + case read_plugin_info(NameVsn, #{}) of + {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> {error, #{ - reason => "failed_to_extract_plugin_package", - path => TarGz, - return => not_found + error_msg => "bad_plugin_running_status", + hint => "stop_the_plugin_first" }}; - {error, Reason} -> + {ok, #{config_status := enabled}} -> {error, #{ - reason => "bad_plugin_package", - path => TarGz, - return => Reason - }} + error_msg => "bad_plugin_config_status", + hint => "disable_the_plugin_first" + }}; + _ -> + purge(NameVsn), + ensure_delete(NameVsn) end. +%% @doc Ensure a plugin is enabled to the end of the plugins list. +-spec ensure_enabled(name_vsn()) -> ok | {error, any()}. +ensure_enabled(NameVsn) -> + ensure_enabled(NameVsn, no_move). + +%% @doc Ensure a plugin is enabled at the given position of the plugin list. +-spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}. +ensure_enabled(NameVsn, Position) -> + ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local). + +-spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}. +ensure_enabled(NameVsn, Position, ConfLocation) when + ConfLocation =:= local; ConfLocation =:= global +-> + ensure_state(NameVsn, Position, _Enabled = true, ConfLocation). + +%% @doc Ensure a plugin is disabled. +-spec ensure_disabled(name_vsn()) -> ok | {error, any()}. +ensure_disabled(NameVsn) -> + ensure_state(NameVsn, no_move, false, _ConfLocation = local). + +%% @doc Delete extracted dir +%% In case one lib is shared by multiple plugins. +%% it might be the case that purging one plugin's install dir +%% will cause deletion of loaded beams. +%% It should not be a problem, because shared lib should +%% reside in all the plugin install dirs. +-spec purge(name_vsn()) -> ok. +purge(NameVsn) -> + _ = maybe_purge_plugin_config(NameVsn), + purge_plugin(NameVsn). + +%% @doc Delete the package file. +-spec delete_package(name_vsn()) -> ok. +delete_package(NameVsn) -> + File = pkg_file(NameVsn), + _ = emqx_plugins_serde:delete_schema(NameVsn), + case file:delete(File) of + ok -> + ?SLOG(info, #{msg => "purged_plugin_dir", path => File}), + ok; + {error, enoent} -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_delete_package_file", + path => File, + reason => Reason + }), + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Plugin runtime management + +%% @doc Start all configured plugins are started. +-spec ensure_started() -> ok. +ensure_started() -> + ok = for_plugins(fun ?MODULE:do_ensure_started/1). + +%% @doc Start a plugin from Management API or CLI. +%% the input is a - string. +-spec ensure_started(name_vsn()) -> ok | {error, term()}. +ensure_started(NameVsn) -> + case do_ensure_started(NameVsn) of + ok -> + ok; + {error, Reason} -> + ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}), + {error, Reason} + end. + +%% @doc Stop all plugins before broker stops. +-spec ensure_stopped() -> ok. +ensure_stopped() -> + for_plugins(fun ?MODULE:ensure_stopped/1). + +%% @doc Stop a plugin from Management API or CLI. +-spec ensure_stopped(name_vsn()) -> ok | {error, term()}. +ensure_stopped(NameVsn) -> + tryit( + "stop_plugin", + fun() -> + Plugin = do_read_plugin(NameVsn), + ensure_apps_stopped(Plugin) + end + ). + +-spec get_plugin_config(name_vsn()) -> + {ok, plugin_config()} | {error, term()}. +get_plugin_config(NameVsn) -> + get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}). + +-spec get_plugin_config(name_vsn(), Options :: map()) -> + {ok, avro_binary() | plugin_config()} + | {error, term()}. +get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> + case read_plugin_avro(NameVsn) of + {ok, _AvroBin} = Res -> Res; + {error, _Reason} = Err -> Err + end; +get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> + persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{}). + +%% @doc Update plugin's config. +%% RPC call from Management API or CLI. +%% the avro binary and plugin config ALWAYS be valid before calling this function. +put_plugin_config(NameVsn, RawAvro, PluginConfig) -> + ok = write_avro_bin(NameVsn, RawAvro), + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), PluginConfig), + ok. + +%% @doc Stop and then start the plugin. +restart(NameVsn) -> + case ensure_stopped(NameVsn) of + ok -> ensure_started(NameVsn); + {error, Reason} -> {error, Reason} + end. + +%% @doc List all installed plugins. +%% Including the ones that are installed, but not enabled in config. +-spec list() -> [plugin()]. +list() -> + Pattern = filename:join([install_dir(), "*", "release.json"]), + All = lists:filtermap( + fun(JsonFilePath) -> + [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)), + case read_plugin_info(NameVsn, #{}) of + {ok, Info} -> + {true, Info}; + {error, Reason} -> + ?SLOG(warning, Reason), + false + end + end, + filelib:wildcard(Pattern) + ), + do_list(configured(), All). + +%%-------------------------------------------------------------------- +%% Package utils + +-spec decode_plugin_avro_config(name_vsn(), binary()) -> {ok, map()} | {error, any()}. +decode_plugin_avro_config(NameVsn, RawAvro) -> + case emqx_plugins_serde:decode(NameVsn, RawAvro) of + {ok, Config} -> {ok, Config}; + {error, ReasonMap} -> {error, ReasonMap} + end. + +get_config(Key, Default) when is_atom(Key) -> + get_config([Key], Default); +get_config(Path, Default) -> + emqx_conf:get([?CONF_ROOT | Path], Default). + +put_config(Key, Value) -> + do_put_config(Key, Value, _ConfLocation = local). + -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}. get_tar(NameVsn) -> TarGz = pkg_file(NameVsn), @@ -135,10 +327,14 @@ get_tar(NameVsn) -> end end. +%%-------------------------------------------------------------------- +%% Internal +%%-------------------------------------------------------------------- + maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) -> maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir)); maybe_create_tar(NameVsn, TarGzName, InstallDir) -> - case filelib:wildcard(filename:join(dir(NameVsn), "**")) of + case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of [_ | _] = PluginFiles -> InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/", PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles], @@ -207,24 +403,32 @@ top_dir_test_() -> ]. -endif. -%% @doc Ensure files and directories for the given plugin are being deleted. -%% If a plugin is running, or enabled, an error is returned. --spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. -ensure_uninstalled(NameVsn) -> - case read_plugin(NameVsn, #{}) of - {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> +do_ensure_installed(NameVsn) -> + TarGz = pkg_file(NameVsn), + case erl_tar:extract(TarGz, [compressed, memory]) of + {ok, TarContent} -> + ok = write_tar_file_content(install_dir(), TarContent), + case read_plugin_info(NameVsn, #{}) of + {ok, _} -> + ok = maybe_post_op_after_install(NameVsn), + ok; + {error, Reason} -> + ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), + ok = delete_tar_file_content(install_dir(), TarContent), + {error, Reason} + end; + {error, {_, enoent}} -> {error, #{ - reason => "bad_plugin_running_status", - hint => "stop_the_plugin_first" + error_msg => "failed_to_extract_plugin_package", + path => TarGz, + reason => not_found }}; - {ok, #{config_status := enabled}} -> + {error, Reason} -> {error, #{ - reason => "bad_plugin_config_status", - hint => "disable_the_plugin_first" - }}; - _ -> - purge(NameVsn), - ensure_delete(NameVsn) + error_msg => "bad_plugin_package", + path => TarGz, + reason => Reason + }} end. ensure_delete(NameVsn0) -> @@ -233,37 +437,19 @@ ensure_delete(NameVsn0) -> put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)), ok. -%% @doc Ensure a plugin is enabled to the end of the plugins list. --spec ensure_enabled(name_vsn()) -> ok | {error, any()}. -ensure_enabled(NameVsn) -> - ensure_enabled(NameVsn, no_move). - -%% @doc Ensure a plugin is enabled at the given position of the plugin list. --spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}. -ensure_enabled(NameVsn, Position) -> - ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local). - --spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}. -ensure_enabled(NameVsn, Position, ConfLocation) when - ConfLocation =:= local; ConfLocation =:= global --> - ensure_state(NameVsn, Position, _Enabled = true, ConfLocation). - -%% @doc Ensure a plugin is disabled. --spec ensure_disabled(name_vsn()) -> ok | {error, any()}. -ensure_disabled(NameVsn) -> - ensure_state(NameVsn, no_move, false, _ConfLocation = local). - ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) -> ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation); ensure_state(NameVsn, Position, State, ConfLocation) -> - case read_plugin(NameVsn, #{}) of + case read_plugin_info(NameVsn, #{}) of {ok, _} -> Item = #{ name_vsn => NameVsn, enable => State }, - tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end); + tryit( + "ensure_state", + fun() -> ensure_configured(Item, Position, ConfLocation) end + ); {error, Reason} -> {error, Reason} end. @@ -295,7 +481,7 @@ add_new_configured(Configured, {Action, NameVsn}, Item) -> {Front, Rear} = lists:splitwith(SplitFun, Configured), Rear =:= [] andalso throw(#{ - error => "position_anchor_plugin_not_configured", + error_msg => "position_anchor_plugin_not_configured", hint => "maybe_install_and_configure", name_vsn => NameVsn }), @@ -307,37 +493,21 @@ add_new_configured(Configured, {Action, NameVsn}, Item) -> Front ++ [Anchor, Item | Rear0] end. -%% @doc Delete the package file. --spec delete_package(name_vsn()) -> ok. -delete_package(NameVsn) -> - File = pkg_file(NameVsn), - case file:delete(File) of - ok -> - ?SLOG(info, #{msg => "purged_plugin_dir", path => File}), - ok; - {error, enoent} -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_delete_package_file", - path => File, - reason => Reason - }), - {error, Reason} - end. +maybe_purge_plugin_config(NameVsn) -> + _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)), + ok. -%% @doc Delete extracted dir -%% In case one lib is shared by multiple plugins. -%% it might be the case that purging one plugin's install dir -%% will cause deletion of loaded beams. -%% It should not be a problem, because shared lib should -%% reside in all the plugin install dirs. --spec purge(name_vsn()) -> ok. -purge(NameVsn) -> - Dir = dir(NameVsn), +purge_plugin(NameVsn) -> + Dir = plugin_dir(NameVsn), + purge_plugin_dir(Dir). + +purge_plugin_dir(Dir) -> case file:del_dir_r(Dir) of ok -> - ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir}); + ?SLOG(info, #{ + msg => "purged_plugin_dir", + dir => Dir + }); {error, enoent} -> ok; {error, Reason} -> @@ -349,72 +519,10 @@ purge(NameVsn) -> {error, Reason} end. -%% @doc Start all configured plugins are started. --spec ensure_started() -> ok. -ensure_started() -> - ok = for_plugins(fun ?MODULE:do_ensure_started/1). - -%% @doc Start a plugin from Management API or CLI. -%% the input is a - string. --spec ensure_started(name_vsn()) -> ok | {error, term()}. -ensure_started(NameVsn) -> - case do_ensure_started(NameVsn) of - ok -> - ok; - {error, Reason} -> - ?SLOG(alert, #{ - msg => "failed_to_start_plugin", - reason => Reason - }), - {error, Reason} - end. - -%% @doc Stop all plugins before broker stops. --spec ensure_stopped() -> ok. -ensure_stopped() -> - for_plugins(fun ?MODULE:ensure_stopped/1). - -%% @doc Stop a plugin from Management API or CLI. --spec ensure_stopped(name_vsn()) -> ok | {error, term()}. -ensure_stopped(NameVsn) -> - tryit( - "stop_plugin", - fun() -> - Plugin = do_read_plugin(NameVsn), - ensure_apps_stopped(Plugin) - end - ). - -%% @doc Stop and then start the plugin. -restart(NameVsn) -> - case ensure_stopped(NameVsn) of - ok -> ensure_started(NameVsn); - {error, Reason} -> {error, Reason} - end. - -%% @doc List all installed plugins. -%% Including the ones that are installed, but not enabled in config. --spec list() -> [plugin()]. -list() -> - Pattern = filename:join([install_dir(), "*", "release.json"]), - All = lists:filtermap( - fun(JsonFile) -> - case read_plugin({file, JsonFile}, #{}) of - {ok, Info} -> - {true, Info}; - {error, Reason} -> - ?SLOG(warning, Reason), - false - end - end, - filelib:wildcard(Pattern) - ), - list(configured(), All). - %% Make sure configured ones are ordered in front. -list([], All) -> +do_list([], All) -> All; -list([#{name_vsn := NameVsn} | Rest], All) -> +do_list([#{name_vsn := NameVsn} | Rest], All) -> SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) -> bin([Name, "-", Vsn]) =/= bin(NameVsn) end, @@ -424,9 +532,9 @@ list([#{name_vsn := NameVsn} | Rest], All) -> msg => "configured_plugin_not_installed", name_vsn => NameVsn }), - list(Rest, All); + do_list(Rest, All); {Front, [I | Rear]} -> - [I | list(Rest, Front ++ Rear)] + [I | do_list(Rest, Front ++ Rear)] end. do_ensure_started(NameVsn) -> @@ -439,23 +547,26 @@ do_ensure_started(NameVsn) -> ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ - msg => "plugin_not_found", + error_msg => "plugin_not_found", name_vsn => NameVsn - }) + }), + ok end end ). +%%-------------------------------------------------------------------- + %% try the function, catch 'throw' exceptions as normal 'error' return %% other exceptions with stacktrace logged. tryit(WhichOp, F) -> try F() catch - throw:Reason -> + throw:ReasonMap -> %% thrown exceptions are known errors %% translate to a return value without stacktrace - {error, Reason}; + {error, ReasonMap}; error:Reason:Stacktrace -> %% unexpected errors, log stacktrace ?SLOG(warning, #{ @@ -469,33 +580,44 @@ tryit(WhichOp, F) -> %% read plugin info from the JSON file %% returns {ok, Info} or {error, Reason} -read_plugin(NameVsn, Options) -> +read_plugin_info(NameVsn, Options) -> tryit( - "read_plugin_info", - fun() -> {ok, do_read_plugin(NameVsn, Options)} end + atom_to_list(?FUNCTION_NAME), + fun() -> {ok, do_read_plugin2(NameVsn, Options)} end ). -do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}). +do_read_plugin(NameVsn) -> + do_read_plugin2(NameVsn, #{}). -do_read_plugin({file, InfoFile}, Options) -> - [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)), - case hocon:load(InfoFile, #{format => richmap}) of - {ok, RichMap} -> - Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), - Info1 = plugins_readme(NameVsn, Options, Info0), - plugin_status(NameVsn, Info1); - {error, Reason} -> - throw(#{ - error => "bad_info_file", - path => InfoFile, - return => Reason - }) - end; -do_read_plugin(NameVsn, Options) -> - do_read_plugin({file, info_file(NameVsn)}, Options). +do_read_plugin2(NameVsn, Option) -> + do_read_plugin3(NameVsn, info_file(NameVsn), Option). + +do_read_plugin3(NameVsn, InfoFilePath, Options) -> + {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file"))(), + Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), + Info1 = plugins_readme(NameVsn, Options, Info0), + plugin_status(NameVsn, Info1). + +read_plugin_avsc(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(schema_file(NameVsn), "bad_avsc_file") + ). + +read_plugin_i18n(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(i18n_file(NameVsn), "bad_i18n_file") + ). + +read_plugin_avro(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(schema_file(NameVsn), "bad_avro_file") + ). ensure_exists_and_installed(NameVsn) -> - case filelib:is_dir(dir(NameVsn)) of + case filelib:is_dir(plugin_dir(NameVsn)) of true -> ok; false -> @@ -581,10 +703,6 @@ plugin_status(NameVsn, Info) -> config_status => ConfSt }. -bin(A) when is_atom(A) -> atom_to_binary(A, utf8); -bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); -bin(B) when is_binary(B) -> B. - check_plugin( #{ <<"name">> := Name, @@ -593,7 +711,7 @@ check_plugin( <<"description">> := _ } = Info, NameVsn, - File + FilePath ) -> case bin(NameVsn) =:= bin([Name, "-", Vsn]) of true -> @@ -605,7 +723,7 @@ check_plugin( catch _:_ -> throw(#{ - error => "bad_rel_apps", + error_msg => "bad_rel_apps", rel_apps => Apps, hint => "A non-empty string list of app_name-app_vsn format" }) @@ -613,16 +731,16 @@ check_plugin( Info; false -> throw(#{ - error => "name_vsn_mismatch", + error_msg => "name_vsn_mismatch", name_vsn => NameVsn, - path => File, + path => FilePath, name => Name, rel_vsn => Vsn }) end; check_plugin(_What, NameVsn, File) -> throw(#{ - error => "bad_info_file_content", + error_msg => "bad_info_file_content", mandatory_fields => [rel_vsn, name, rel_apps, description], name_vsn => NameVsn, path => File @@ -678,7 +796,7 @@ do_load_plugin_app(AppName, Ebin) -> ok; {error, Reason} -> throw(#{ - error => "failed_to_load_plugin_beam", + error_msg => "failed_to_load_plugin_beam", path => BeamFile, reason => Reason }) @@ -693,7 +811,7 @@ do_load_plugin_app(AppName, Ebin) -> ok; {error, Reason} -> throw(#{ - error => "failed_to_load_plugin_app", + error_msg => "failed_to_load_plugin_app", name => AppName, reason => Reason }) @@ -710,7 +828,7 @@ start_app(App) -> ok; {error, {ErrApp, Reason}} -> throw(#{ - error => "failed_to_start_plugin_app", + error_msg => "failed_to_start_plugin_app", app => App, err_app => ErrApp, reason => Reason @@ -775,7 +893,7 @@ stop_app(App) -> ?SLOG(debug, #{msg => "plugin_not_started", app => App}), ok = unload_moudle_and_app(App); {error, Reason} -> - throw(#{error => "failed_to_stop_app", app => App, reason => Reason}) + throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason}) end. unload_moudle_and_app(App) -> @@ -802,94 +920,22 @@ is_needed_by(AppToStop, RunningApp) -> undefined -> false end. -put_config(Key, Value) -> - put_config(Key, Value, _ConfLocation = local). - -put_config(Key, Value, ConfLocation) when is_atom(Key) -> - put_config([Key], Value, ConfLocation); -put_config(Path, Values, _ConfLocation = local) when is_list(Path) -> +do_put_config(Key, Value, ConfLocation) when is_atom(Key) -> + do_put_config([Key], Value, ConfLocation); +do_put_config(Path, Values, _ConfLocation = local) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, %% Already in cluster_rpc, don't use emqx_conf:update, dead calls case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end; -put_config(Path, Values, _ConfLocation = global) when is_list(Path) -> +do_put_config(Path, Values, _ConfLocation = global) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end. -bin_key(Map) when is_map(Map) -> - maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map); -bin_key(List = [#{} | _]) -> - lists:map(fun(M) -> bin_key(M) end, List); -bin_key(Term) -> - Term. - -get_config(Key, Default) when is_atom(Key) -> - get_config([Key], Default); -get_config(Path, Default) -> - emqx_conf:get([?CONF_ROOT | Path], Default). - -install_dir() -> get_config(install_dir, ""). - -put_configured(Configured) -> - put_configured(Configured, _ConfLocation = local). - -put_configured(Configured, ConfLocation) -> - ok = put_config(states, bin_key(Configured), ConfLocation). - -configured() -> - get_config(states, []). - -for_plugins(ActionFun) -> - case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of - [] -> ok; - Errors -> erlang:error(#{function => ActionFun, errors => Errors}) - end. - -for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> - case Fun(NameVsn) of - ok -> []; - {error, Reason} -> [{NameVsn, Reason}] - end; -for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> - ?SLOG(debug, #{ - msg => "plugin_disabled", - name_vsn => NameVsn - }), - []. - -parse_name_vsn(NameVsn) when is_binary(NameVsn) -> - parse_name_vsn(binary_to_list(NameVsn)); -parse_name_vsn(NameVsn) when is_list(NameVsn) -> - case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of - {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn}; - _ -> {error, "bad_name_vsn"} - end. - -pkg_file(NameVsn) -> - filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). - -dir(NameVsn) -> - filename:join([install_dir(), NameVsn]). - -info_file(NameVsn) -> - filename:join([dir(NameVsn), "release.json"]). - -readme_file(NameVsn) -> - filename:join([dir(NameVsn), "README.md"]). - -running_apps() -> - lists:map( - fun({N, _, V}) -> - {N, V} - end, - application:which_applications(infinity) - ). - %%-------------------------------------------------------------------- %% `emqx_config_handler' API %%-------------------------------------------------------------------- @@ -913,3 +959,120 @@ enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) -> ok; enable_disable_plugin(_NameVsn, _Diff) -> ok. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +install_dir() -> + get_config(install_dir, ""). + +put_configured(Configured) -> + put_configured(Configured, _ConfLocation = local). + +put_configured(Configured, ConfLocation) -> + ok = do_put_config(states, bin_key(Configured), ConfLocation). + +configured() -> + get_config(states, []). + +for_plugins(ActionFun) -> + case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of + [] -> ok; + Errors -> erlang:error(#{function => ActionFun, errors => Errors}) + end. + +for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> + case Fun(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; +for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> + ?SLOG(debug, #{ + msg => "plugin_disabled", + name_vsn => NameVsn + }), + []. + +maybe_post_op_after_install(NameVsn) -> + _ = maybe_load_config_schema(NameVsn), + _ = maybe_create_config_dir(NameVsn), + ok. + +maybe_load_config_schema(NameVsn) -> + case read_plugin_avsc(NameVsn) of + {ok, Avsc} -> + case emqx_plugins_serde:add_schema(NameVsn, Avsc) of + ok -> ok; + {error, already_exists} -> ok; + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> + ?SLOG(warning, Reason) + end. + +maybe_create_config_dir(NameVsn) -> + case filelib:ensure_path(plugin_config_dir(NameVsn)) of + ok -> ok; + {error, Reason} -> ?SLOG(warning, Reason) + end. + +write_avro_bin(NameVsn, AvroBin) -> + ok = file:write_file(avro_config_file(NameVsn), AvroBin). + +read_file_fun(Path, ErrMsg) -> + fun() -> + case hocon:load(Path, #{format => richmap}) of + {ok, RichMap} -> + {ok, hocon_maps:ensure_plain(RichMap)}; + {error, Reason} -> + ErrMeta = #{error_msg => ErrMsg, reason => Reason}, + ?SLOG(warning, ErrMeta), + throw(ErrMeta) + end + end. + +%% Directorys +plugin_dir(NameVsn) -> + filename:join([install_dir(), NameVsn]). + +plugin_config_dir(NameVsn) -> + filename:join([plugin_dir(NameVsn), "data", "configs"]). + +%% Files +pkg_file(NameVsn) -> + filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). + +info_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "release.json"]). + +schema_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). + +avro_config_file(NameVsn) -> + filename:join([plugin_config_dir(NameVsn), "config.avro"]). + +i18n_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "i18n.json"]). + +readme_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "README.md"]). + +running_apps() -> + lists:map( + fun({N, _, V}) -> + {N, V} + end, + application:which_applications(infinity) + ). + +bin_key(Map) when is_map(Map) -> + maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map); +bin_key(List = [#{} | _]) -> + lists:map(fun(M) -> bin_key(M) end, List); +bin_key(Term) -> + Term. + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); +bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl new file mode 100644 index 000000000..a89f16e70 --- /dev/null +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -0,0 +1,274 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_serde). + +-include("emqx_plugins.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([ + start_link/0, + get_serde/1, + add_schema/2, + get_schema/1, + delete_schema/1 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_continue/2, + terminate/2 +]). + +-export([ + decode/2, + encode/2 +]). + +%%------------------------------------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec get_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}. +get_serde(SchemaName) -> + case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of + [] -> + {error, not_found}; + [Serde] -> + {ok, Serde} + end. + +-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. +add_schema(Name, Avsc) -> + case get_serde(Name) of + {ok, _Serde} -> + ?SLOG(warning, #{msg => "plugin_avsc_schema_already_exists", name_vsn => Name}), + {error, already_exists}; + {error, not_found} -> + case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}) of + ok -> + ?SLOG(debug, #{msg => "plugin_avsc_schema_added", name_vsn => Name}), + ok; + {error, Reason} = E -> + ?SLOG(error, #{ + msg => "plugin_avsc_schema_added_failed", + reason => emqx_utils:readable_error_msg(Reason) + }), + E + end + end. + +get_schema(NameVsn) -> + Path = emqx_plugins:schema_file(NameVsn), + case read_avsc_file(Path) of + {ok, Avsc} -> + {ok, Avsc}; + {error, Reason} -> + ?SLOG(warning, Reason), + {error, Reason} + end. + +-spec delete_schema(schema_name()) -> ok | {error, term()}. +delete_schema(NameVsn) -> + case get_serde(NameVsn) of + {ok, _Serde} -> + async_delete_serdes([NameVsn]), + ok; + {error, not_found} -> + {error, not_found} + end. + +-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}. +decode(SerdeName, RawData) -> + with_serde( + "decode_avro_binary", + eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData]) + ). + +-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}. +encode(SerdeName, Data) -> + with_serde( + "encode_avro_data", + eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data]) + ). + +%%------------------------------------------------------------------------------------------------- +%% `gen_server' API +%%------------------------------------------------------------------------------------------------- + +init(_) -> + process_flag(trap_exit, true), + ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [ + public, ordered_set, {keypos, #plugin_schema_serde.name} + ]), + State = #{}, + SchemasMap = read_plugin_avsc(), + {ok, State, {continue, {build_serdes, SchemasMap}}}. + +handle_continue({build_serdes, SchemasMap}, State) -> + _ = build_serdes(SchemasMap), + {noreply, State}. + +handle_call({build_serdes, {NameVsn, Avsc}}, _From, State) -> + BuildRes = do_build_serde(NameVsn, Avsc), + {reply, BuildRes, State}; +handle_call(_Call, _From, State) -> + {reply, {error, unknown_call}, State}. + +handle_cast({delete_serdes, Names}, State) -> + lists:foreach(fun ensure_serde_absent/1, Names), + {noreply, State}; +handle_cast(_Cast, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +read_plugin_avsc() -> + Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), + lists:foldl( + fun(AvscPath, AccIn) -> + case read_avsc_file(AvscPath) of + {ok, Avsc} -> + [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + AccIn#{to_bin(NameVsn) => Avsc}; + {error, Reason} -> + ?SLOG(warning, Reason), + AccIn + end + end, + _Acc0 = #{}, + filelib:wildcard(Pattern) + ). + +build_serdes(Schemas) -> + maps:foreach(fun do_build_serde/2, Schemas). + +do_build_serde(NameVsn, Avsc) -> + try + Serde = make_serde(NameVsn, Avsc), + true = ets:insert(?PLUGIN_SERDE_TAB, Serde), + ok + catch + Kind:Error:Stacktrace -> + ?SLOG( + error, + #{ + msg => "error_building_plugin_schema_serde", + name => NameVsn, + kind => Kind, + error => Error, + stacktrace => Stacktrace + } + ), + {error, Error} + end. + +make_serde(NameVsn, Avsc) -> + Store0 = avro_schema_store:new([map]), + %% import the schema into the map store with an assigned name + %% if it's a named schema (e.g. struct), then Name is added as alias + Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0), + #plugin_schema_serde{ + name = NameVsn, + eval_context = Store + }. + +ensure_serde_absent(Name) when not is_binary(Name) -> + ensure_serde_absent(to_bin(Name)); +ensure_serde_absent(Name) -> + case get_serde(Name) of + {ok, _Serde} -> + _ = ets:delete(?PLUGIN_SERDE_TAB, Name), + ok; + {error, not_found} -> + ok + end. + +async_delete_serdes(Names) -> + gen_server:cast(?MODULE, {delete_serdes, Names}). + +with_serde(WhichOp, Fun) -> + try + Fun() + catch + throw:Reason -> + ?SLOG(error, Reason#{ + which_op => WhichOp, + reason => emqx_utils:readable_error_msg(Reason) + }), + {error, Reason}; + error:Reason:Stacktrace -> + %% unexpected errors, log stacktrace + ?SLOG(warning, #{ + msg => "plugin_schema_op_failed", + which_op => WhichOp, + exception => Reason, + stacktrace => Stacktrace + }), + {error, #{ + which_op => WhichOp, + reason => Reason + }} + end. + +eval_serde_fun(Op, ErrMsg, SerdeName, Args) -> + fun() -> + case get_serde(SerdeName) of + {ok, Serde} -> + eval_serde(Op, Serde, Args); + {error, not_found} -> + throw(#{ + error_msg => ErrMsg, + reason => plugin_serde_not_found, + serde_name => SerdeName + }) + end + end. + +eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> + Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), + {ok, avro_binary_decoder:decode(Data, Name, Store, Opts)}; +eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> + {ok, avro_binary_encoder:encode(Store, Name, Data)}; +eval_serde(_, _, _) -> + throw(#{error_msg => "unexpected_plugin_avro_op"}). + +read_avsc_file(Path) -> + case file:read_file(Path) of + {ok, Bin} -> + {ok, Bin}; + {error, _} -> + {error, #{ + error => "failed_to_read_plugin_schema", + path => Path + }} + end. + +to_bin(A) when is_atom(A) -> atom_to_binary(A); +to_bin(L) when is_list(L) -> iolist_to_binary(L); +to_bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_plugins/src/emqx_plugins_sup.erl b/apps/emqx_plugins/src/emqx_plugins_sup.erl index f2092fb28..9f396c14d 100644 --- a/apps/emqx_plugins/src/emqx_plugins_sup.erl +++ b/apps/emqx_plugins/src/emqx_plugins_sup.erl @@ -32,4 +32,14 @@ init([]) -> intensity => 100, period => 10 }, - {ok, {SupFlags, []}}. + ChildSpecs = [child_spec(emqx_plugins_serde)], + {ok, {SupFlags, ChildSpecs}}. + +child_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker + }. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 0f66e20dc..d7bdfad13 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -346,7 +346,7 @@ t_enable_disable(Config) -> ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()), ?assertMatch( {error, #{ - reason := "bad_plugin_config_status", + error_msg := "bad_plugin_config_status", hint := "disable_the_plugin_first" }}, emqx_plugins:ensure_uninstalled(NameVsn) @@ -374,15 +374,15 @@ t_bad_tar_gz(Config) -> ok = file:write_file(FakeTarTz, "a\n"), ?assertMatch( {error, #{ - reason := "bad_plugin_package", - return := eof + error_msg := "bad_plugin_package", + reason := eof }}, emqx_plugins:ensure_installed("fake-vsn") ), ?assertMatch( {error, #{ - reason := "failed_to_extract_plugin_package", - return := not_found + error_msg := "failed_to_extract_plugin_package", + reason := not_found }}, emqx_plugins:ensure_installed("nonexisting") ), @@ -412,7 +412,7 @@ t_bad_tar_gz2(Config) -> ?assert(filelib:is_regular(TarGz)), %% failed to install, it also cleans up the bad content of .tar.gz file ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))), %% but the tar.gz file is still around ?assert(filelib:is_regular(TarGz)), ok. @@ -440,8 +440,8 @@ t_tar_vsn_content_mismatch(Config) -> %% failed to install, it also cleans up content of the bad .tar.gz file even %% if in other directory ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir("foo-0.2"))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir("foo-0.2"))), %% the tar.gz file is still around ?assert(filelib:is_regular(TarGz)), ok. @@ -455,15 +455,15 @@ t_bad_info_json(Config) -> ok = write_info_file(Config, NameVsn, "bad-syntax"), ?assertMatch( {error, #{ - error := "bad_info_file", - return := {parse_error, _} + error_msg := "bad_info_file", + reason := {parse_error, _} }}, emqx_plugins:describe(NameVsn) ), ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"), ?assertMatch( {error, #{ - error := "bad_info_file_content", + error_msg := "bad_info_file_content", mandatory_fields := _ }}, emqx_plugins:describe(NameVsn) @@ -499,7 +499,7 @@ t_elixir_plugin(Config) -> ok = emqx_plugins:ensure_installed(NameVsn), %% idempotent ok = emqx_plugins:ensure_installed(NameVsn), - {ok, Info} = emqx_plugins:read_plugin(NameVsn, #{}), + {ok, Info} = emqx_plugins:read_plugin_info(NameVsn, #{}), ?assertEqual([Info], emqx_plugins:list()), %% start ok = emqx_plugins:ensure_started(NameVsn), diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index d064cba9d..010d96de9 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -57,7 +57,7 @@ read_plugin_test() -> ok = write_file(InfoFile, FakeInfo), ?assertMatch( {error, #{error := "bad_rel_apps"}}, - emqx_plugins:read_plugin(NameVsn, #{}) + emqx_plugins:read_plugin_info(NameVsn, #{}) ) after emqx_plugins:purge(NameVsn) @@ -109,7 +109,7 @@ purge_test() -> with_rand_install_dir( fun(_Dir) -> File = emqx_plugins:info_file("a-1"), - Dir = emqx_plugins:dir("a-1"), + Dir = emqx_plugins:plugin_dir("a-1"), ok = filelib:ensure_dir(File), ?assertMatch({ok, _}, file:read_file_info(Dir)), ?assertEqual(ok, emqx_plugins:purge("a-1")), From d06f410fd52dbc792207541a4eeb6bc9fef81a13 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 15:24:38 +0800 Subject: [PATCH 03/22] fix(plugins): read avsc file --- apps/emqx_plugins/src/emqx_plugins.erl | 56 +++++++++++++------ apps/emqx_plugins/src/emqx_plugins_serde.erl | 4 +- apps/emqx_plugins/test/emqx_plugins_tests.erl | 6 +- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index ce66fc94d..6d5a4bcbc 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -81,7 +81,9 @@ %% Defines -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}). -%% Types +-define(RAW_BIN, binary). +-define(JSON_MAP, json_map). + %% "my_plugin-0.1.0" -type name_vsn() :: binary() | string(). %% the parse result of the JSON info file @@ -590,30 +592,36 @@ do_read_plugin(NameVsn) -> do_read_plugin2(NameVsn, #{}). do_read_plugin2(NameVsn, Option) -> - do_read_plugin3(NameVsn, info_file(NameVsn), Option). + do_read_plugin3(NameVsn, info_file_path(NameVsn), Option). do_read_plugin3(NameVsn, InfoFilePath, Options) -> - {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file"))(), + {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(), Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), Info1 = plugins_readme(NameVsn, Options, Info0), plugin_status(NameVsn, Info1). read_plugin_avsc(NameVsn) -> + read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}). +read_plugin_avsc(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(schema_file(NameVsn), "bad_avsc_file") + read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options) ). read_plugin_i18n(NameVsn) -> + read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}). +read_plugin_i18n(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(i18n_file(NameVsn), "bad_i18n_file") + read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options) ). read_plugin_avro(NameVsn) -> + read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}). +read_plugin_avro(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(schema_file(NameVsn), "bad_avro_file") + read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options) ). ensure_exists_and_installed(NameVsn) -> @@ -1000,15 +1008,22 @@ maybe_post_op_after_install(NameVsn) -> ok. maybe_load_config_schema(NameVsn) -> - case read_plugin_avsc(NameVsn) of - {ok, Avsc} -> - case emqx_plugins_serde:add_schema(NameVsn, Avsc) of + filelib:is_regular(avsc_file_path(NameVsn)) andalso + do_load_config_schema(NameVsn). + +do_load_config_schema(NameVsn) -> + case read_plugin_avsc(NameVsn, #{read_mode => ?RAW_BIN}) of + {ok, AvscBin} -> + case emqx_plugins_serde:add_schema(NameVsn, AvscBin) of ok -> ok; {error, already_exists} -> ok; - {error, Reason} -> {error, Reason} + {error, _Reason} -> ok end; {error, Reason} -> - ?SLOG(warning, Reason) + ?SLOG(warning, #{ + msg => "failed_to_read_plugin_avsc", reason => Reason, name_vsn => NameVsn + }), + ok end. maybe_create_config_dir(NameVsn) -> @@ -1020,14 +1035,23 @@ maybe_create_config_dir(NameVsn) -> write_avro_bin(NameVsn, AvroBin) -> ok = file:write_file(avro_config_file(NameVsn), AvroBin). -read_file_fun(Path, ErrMsg) -> +read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) -> + fun() -> + case file:read_file(Path) of + {ok, Bin} -> + {ok, Bin}; + {error, Reason} -> + ErrMeta = #{error_msg => ErrMsg, reason => Reason}, + throw(ErrMeta) + end + end; +read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> fun() -> case hocon:load(Path, #{format => richmap}) of {ok, RichMap} -> {ok, hocon_maps:ensure_plain(RichMap)}; {error, Reason} -> ErrMeta = #{error_msg => ErrMsg, reason => Reason}, - ?SLOG(warning, ErrMeta), throw(ErrMeta) end end. @@ -1043,16 +1067,16 @@ plugin_config_dir(NameVsn) -> pkg_file(NameVsn) -> filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). -info_file(NameVsn) -> +info_file_path(NameVsn) -> filename:join([plugin_dir(NameVsn), "release.json"]). -schema_file(NameVsn) -> +avsc_file_path(NameVsn) -> filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). avro_config_file(NameVsn) -> filename:join([plugin_config_dir(NameVsn), "config.avro"]). -i18n_file(NameVsn) -> +i18n_file_path(NameVsn) -> filename:join([plugin_dir(NameVsn), "i18n.json"]). readme_file(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index a89f16e70..083f9740d 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -79,7 +79,7 @@ add_schema(Name, Avsc) -> end. get_schema(NameVsn) -> - Path = emqx_plugins:schema_file(NameVsn), + Path = emqx_plugins:avsc_file_path(NameVsn), case read_avsc_file(Path) of {ok, Avsc} -> {ok, Avsc}; @@ -129,7 +129,7 @@ handle_continue({build_serdes, SchemasMap}, State) -> _ = build_serdes(SchemasMap), {noreply, State}. -handle_call({build_serdes, {NameVsn, Avsc}}, _From, State) -> +handle_call({build_serdes, NameVsn, Avsc}, _From, State) -> BuildRes = do_build_serde(NameVsn, Avsc), {reply, BuildRes, State}; handle_call(_Call, _From, State) -> diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index 010d96de9..910c30564 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -49,7 +49,7 @@ read_plugin_test() -> with_rand_install_dir( fun(_Dir) -> NameVsn = "bar-5", - InfoFile = emqx_plugins:info_file(NameVsn), + InfoFile = emqx_plugins:info_file_path(NameVsn), FakeInfo = "name=bar, rel_vsn=\"5\", rel_apps=[justname_no_vsn]," "description=\"desc bar\"", @@ -90,7 +90,7 @@ delete_package_test() -> meck_emqx(), with_rand_install_dir( fun(_Dir) -> - File = emqx_plugins:pkg_file("a-1"), + File = emqx_plugins:pkg_file_path("a-1"), ok = write_file(File, "a"), ok = emqx_plugins:delete_package("a-1"), %% delete again should be ok @@ -108,7 +108,7 @@ purge_test() -> meck_emqx(), with_rand_install_dir( fun(_Dir) -> - File = emqx_plugins:info_file("a-1"), + File = emqx_plugins:info_file_path("a-1"), Dir = emqx_plugins:plugin_dir("a-1"), ok = filelib:ensure_dir(File), ?assertMatch({ok, _}, file:read_file_info(Dir)), From d2e0c09f2e0d1117350e8d1cad41907106c08770 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 16:44:14 +0800 Subject: [PATCH 04/22] fix: make static check happy --- .../src/emqx_mgmt_api_plugins.erl | 6 +++--- .../proto/emqx_mgmt_api_plugins_proto_v2.erl | 1 - apps/emqx_plugins/rebar.config | 5 ++++- apps/emqx_plugins/src/emqx_plugins.app.src | 2 +- apps/emqx_plugins/src/emqx_plugins.erl | 17 +++++++++++++---- 5 files changed, 21 insertions(+), 10 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 63a8ba517..ac79c6d84 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -519,7 +519,7 @@ plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawA plugin_schema(get, #{bindings := #{name := NameVsn}}) -> case emqx_plugins:describe(NameVsn) of {ok, _Plugin} -> - {200, format_plugin_schema_with_i18n(NameVsn)}; + {200, format_plugin_avsc_and_i18n(NameVsn)}; _ -> {404, #{ code => 'NOT_FOUND', @@ -685,7 +685,7 @@ aggregate_status([{Node, Plugins} | List], Acc) -> ), aggregate_status(List, NewAcc). -format_plugin_schema_with_i18n(NameVsn) -> +format_plugin_avsc_and_i18n(NameVsn) -> #{ avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) @@ -693,7 +693,7 @@ format_plugin_schema_with_i18n(NameVsn) -> try_read_file(Fun) -> case Fun() of - {ok, Bin} -> Bin; + {ok, Json} -> Json; _ -> null end. diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl index 91d4674f9..781096af0 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl @@ -24,7 +24,6 @@ describe_package/2, delete_package/1, ensure_action/2 - %% plugin_config/2 ]). -include_lib("emqx/include/bpapi.hrl"). diff --git a/apps/emqx_plugins/rebar.config b/apps/emqx_plugins/rebar.config index 9f17b7657..63d56ac72 100644 --- a/apps/emqx_plugins/rebar.config +++ b/apps/emqx_plugins/rebar.config @@ -1,5 +1,8 @@ %% -*- mode: erlang -*- -{deps, [{emqx, {path, "../emqx"}}]}. +{deps, [ + {emqx, {path, "../emqx"}}, + {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}} +]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index 0e0945c7f..6501d5654 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -4,6 +4,6 @@ {vsn, "0.2.0"}, {modules, []}, {mod, {emqx_plugins_app, []}}, - {applications, [kernel, stdlib, emqx]}, + {applications, [kernel, stdlib, emqx, erlavro]}, {env, []} ]}. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 6d5a4bcbc..7465b5ff1 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -62,7 +62,8 @@ get_config/2, put_config/2, get_tar/1, - install_dir/0 + install_dir/0, + avsc_file_path/1 ]). %% `emqx_config_handler' API @@ -1027,9 +1028,17 @@ do_load_config_schema(NameVsn) -> end. maybe_create_config_dir(NameVsn) -> - case filelib:ensure_path(plugin_config_dir(NameVsn)) of - ok -> ok; - {error, Reason} -> ?SLOG(warning, Reason) + ConfigDir = plugin_config_dir(NameVsn), + case filelib:ensure_path(ConfigDir) of + ok -> + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_create_plugin_config_dir", + dir => ConfigDir, + reason => Reason + }), + {error, {mkdir_failed, ConfigDir, Reason}} end. write_avro_bin(NameVsn, AvroBin) -> From 2686a66b1455297a4f6797ba73b8b844d5b0c676 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 17:33:08 +0800 Subject: [PATCH 05/22] fix: make eunit happy --- apps/emqx_plugins/src/emqx_plugins.erl | 12 ++++++------ apps/emqx_plugins/test/emqx_plugins_tests.erl | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 7465b5ff1..722a2152c 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -193,7 +193,7 @@ purge(NameVsn) -> %% @doc Delete the package file. -spec delete_package(name_vsn()) -> ok. delete_package(NameVsn) -> - File = pkg_file(NameVsn), + File = pkg_file_path(NameVsn), _ = emqx_plugins_serde:delete_schema(NameVsn), case file:delete(File) of ok -> @@ -317,7 +317,7 @@ put_config(Key, Value) -> -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}. get_tar(NameVsn) -> - TarGz = pkg_file(NameVsn), + TarGz = pkg_file_path(NameVsn), case file:read_file(TarGz) of {ok, Content} -> {ok, Content}; @@ -407,7 +407,7 @@ top_dir_test_() -> -endif. do_ensure_installed(NameVsn) -> - TarGz = pkg_file(NameVsn), + TarGz = pkg_file_path(NameVsn), case erl_tar:extract(TarGz, [compressed, memory]) of {ok, TarContent} -> ok = write_tar_file_content(install_dir(), TarContent), @@ -633,7 +633,7 @@ ensure_exists_and_installed(NameVsn) -> %% Do we have the package, but it's not extracted yet? case get_tar(NameVsn) of {ok, TarContent} -> - ok = file:write_file(pkg_file(NameVsn), TarContent), + ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); _ -> %% If not, try to get it from the cluster. @@ -645,7 +645,7 @@ do_get_from_cluster(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], case get_from_any_node(Nodes, NameVsn, []) of {ok, TarContent} -> - ok = file:write_file(pkg_file(NameVsn), TarContent), + ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); {error, NodeErrors} when Nodes =/= [] -> ?SLOG(error, #{ @@ -1073,7 +1073,7 @@ plugin_config_dir(NameVsn) -> filename:join([plugin_dir(NameVsn), "data", "configs"]). %% Files -pkg_file(NameVsn) -> +pkg_file_path(NameVsn) -> filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). info_file_path(NameVsn) -> diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index 910c30564..fb4277c4f 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -56,7 +56,7 @@ read_plugin_test() -> try ok = write_file(InfoFile, FakeInfo), ?assertMatch( - {error, #{error := "bad_rel_apps"}}, + {error, #{error_msg := "bad_rel_apps"}}, emqx_plugins:read_plugin_info(NameVsn, #{}) ) after From 27d1f91cac762aa0482158bbc027d08228f02c58 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 17:36:24 +0800 Subject: [PATCH 06/22] refactor: refine function name --- apps/emqx_plugins/src/emqx_plugins_serde.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index 083f9740d..da8515390 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -22,7 +22,7 @@ %% API -export([ start_link/0, - get_serde/1, + lookup_serde/1, add_schema/2, get_schema/1, delete_schema/1 @@ -49,8 +49,8 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec get_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}. -get_serde(SchemaName) -> +-spec lookup_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}. +lookup_serde(SchemaName) -> case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of [] -> {error, not_found}; @@ -60,7 +60,7 @@ get_serde(SchemaName) -> -spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. add_schema(Name, Avsc) -> - case get_serde(Name) of + case lookup_serde(Name) of {ok, _Serde} -> ?SLOG(warning, #{msg => "plugin_avsc_schema_already_exists", name_vsn => Name}), {error, already_exists}; @@ -90,7 +90,7 @@ get_schema(NameVsn) -> -spec delete_schema(schema_name()) -> ok | {error, term()}. delete_schema(NameVsn) -> - case get_serde(NameVsn) of + case lookup_serde(NameVsn) of {ok, _Serde} -> async_delete_serdes([NameVsn]), ok; @@ -201,7 +201,7 @@ make_serde(NameVsn, Avsc) -> ensure_serde_absent(Name) when not is_binary(Name) -> ensure_serde_absent(to_bin(Name)); ensure_serde_absent(Name) -> - case get_serde(Name) of + case lookup_serde(Name) of {ok, _Serde} -> _ = ets:delete(?PLUGIN_SERDE_TAB, Name), ok; @@ -238,7 +238,7 @@ with_serde(WhichOp, Fun) -> eval_serde_fun(Op, ErrMsg, SerdeName, Args) -> fun() -> - case get_serde(SerdeName) of + case lookup_serde(SerdeName) of {ok, Serde} -> eval_serde(Op, Serde, Args); {error, not_found} -> From c0429ca333ddd7626dd3d00aa0774bad37ca7f09 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 17:38:35 +0800 Subject: [PATCH 07/22] fix(plugin): refine schema serde log --- apps/emqx_plugins/src/emqx_plugins_serde.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index da8515390..5bc5aaa0c 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -62,16 +62,17 @@ lookup_serde(SchemaName) -> add_schema(Name, Avsc) -> case lookup_serde(Name) of {ok, _Serde} -> - ?SLOG(warning, #{msg => "plugin_avsc_schema_already_exists", name_vsn => Name}), + ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => Name}), {error, already_exists}; {error, not_found} -> case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}) of ok -> - ?SLOG(debug, #{msg => "plugin_avsc_schema_added", name_vsn => Name}), + ?SLOG(debug, #{msg => "plugin_schema_added", plugin => Name}), ok; {error, Reason} = E -> ?SLOG(error, #{ - msg => "plugin_avsc_schema_added_failed", + msg => "plugin_schema_add_failed", + plugin => Name, reason => emqx_utils:readable_error_msg(Reason) }), E From 1f00ce789fbb4947069d3471a5be77ac74fdd862 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 17:45:36 +0800 Subject: [PATCH 08/22] fix(plugin): gen_server call timeout infinity --- apps/emqx_plugins/src/emqx_plugins_serde.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index 5bc5aaa0c..726d3c04b 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -65,7 +65,7 @@ add_schema(Name, Avsc) -> ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => Name}), {error, already_exists}; {error, not_found} -> - case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}) of + case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}, infinity) of ok -> ?SLOG(debug, #{msg => "plugin_schema_added", plugin => Name}), ok; From c180b6a417ce8397c154dfcd181ce8d01a6cf02a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Apr 2024 17:48:26 +0800 Subject: [PATCH 09/22] fix(api): plugin api docs --- apps/emqx_management/src/emqx_mgmt_api_plugins.erl | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index ac79c6d84..e065e0301 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -170,10 +170,9 @@ schema("/plugins/:name/config") -> #{ 'operationId' => plugin_config, get => #{ - summary => - <<"Get plugin config">>, + summary => <<"Get plugin config">>, description => - "Get plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + "Get plugin config. Config schema is defined by user's schema.avsc file.
", tags => ?TAGS, parameters => [hoconsc:ref(name)], responses => #{ @@ -186,7 +185,7 @@ schema("/plugins/:name/config") -> summary => <<"Update plugin config">>, description => - "Update plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + "Update plugin config. Config schema defined by user's schema.avsc file.
", tags => ?TAGS, parameters => [hoconsc:ref(name)], 'requestBody' => #{ @@ -215,9 +214,8 @@ schema("/plugins/:name/schema") -> #{ 'operationId' => plugin_schema, get => #{ - summary => <<"Get installed plugin's avro schema">>, - description => - "Get plugin's config avro schema.", + summary => <<"Get installed plugin's AVRO schema">>, + description => "Get plugin's config AVRO schema.", tags => ?TAGS, parameters => [hoconsc:ref(name)], responses => #{ From d2ecccc2ff06011befacfcfa9cac865b54e863b9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 13:40:55 +0800 Subject: [PATCH 10/22] fix: call json encoder/decoder for plugin config --- .../src/emqx_mgmt_api_plugins.erl | 27 +++++++++---------- .../proto/emqx_mgmt_api_plugins_proto_v3.erl | 8 ++++-- apps/emqx_plugins/src/emqx_plugins.erl | 19 +++++++------ apps/emqx_plugins/src/emqx_plugins_serde.erl | 10 +++---- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index e065e0301..c98f1053e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -176,7 +176,7 @@ schema("/plugins/:name/config") -> tags => ?TAGS, parameters => [hoconsc:ref(name)], responses => #{ - %% binary avro encoded config + %% avro data, json encoded 200 => hoconsc:mk(binary()), 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } @@ -190,14 +190,10 @@ schema("/plugins/:name/config") -> parameters => [hoconsc:ref(name)], 'requestBody' => #{ content => #{ - 'multipart/form-data' => #{ + 'application/json' => #{ schema => #{ - type => object, - properties => #{ - ?CONTENT_CONFIG => #{type => string, format => binary} - } - }, - encoding => #{?CONTENT_CONFIG => #{'contentType' => 'application/gzip'}} + type => object + } } } }, @@ -499,12 +495,14 @@ plugin_config(get, #{bindings := #{name := Name}}) -> message => <<"Failed to get plugin config">> }} end; -plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawAvro}}) -> - case emqx_plugins:decode_plugin_avro_config(Name, RawAvro) of - {ok, Config} -> +plugin_config(put, #{bindings := #{name := Name}, body := AvroJsonMap}) -> + AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), + case emqx_plugins:decode_plugin_avro_config(Name, AvroJsonBin) of + {ok, AvroValueConfig} -> Nodes = emqx:running_nodes(), + %% cluster call with config in map (binary key-value) _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( - Nodes, Name, RawAvro, Config + Nodes, Name, AvroJsonMap, AvroValueConfig ), {204}; {error, Reason} -> @@ -595,8 +593,9 @@ ensure_action(Name, restart) -> ok. %% for RPC plugin avro encoded config update -do_update_plugin_config(Name, Avro, PluginConfig) -> - emqx_plugins:put_plugin_config(Name, Avro, PluginConfig). +do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) -> + %% TOOD: maybe use `PluginConfigMap` to validate config + emqx_plugins:put_plugin_config(Name, AvroJsonMap, PluginConfigMap). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl index 13c13bae7..b428a38cc 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl @@ -59,7 +59,11 @@ ensure_action(Name, Action) -> map() ) -> emqx_rpc:multicall_result(). -update_plugin_config(Nodes, Name, RawAvro, PluginConfig) -> +update_plugin_config(Nodes, Name, AvroJsonMap, PluginConfig) -> rpc:multicall( - Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, [Name, RawAvro, PluginConfig], 10000 + Nodes, + emqx_mgmt_api_plugins, + do_update_plugin_config, + [Name, AvroJsonMap, PluginConfig], + 10000 ). diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 722a2152c..0d0dc1fae 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -51,7 +51,7 @@ ensure_stopped/1, get_plugin_config/1, get_plugin_config/2, - put_plugin_config/3, + put_plugin_config/4, restart/1, list/0 ]). @@ -256,7 +256,7 @@ get_plugin_config(NameVsn) -> | {error, term()}. get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> case read_plugin_avro(NameVsn) of - {ok, _AvroBin} = Res -> Res; + {ok, _AvroJson} = Res -> Res; {error, _Reason} = Err -> Err end; get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> @@ -265,9 +265,10 @@ get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> %% @doc Update plugin's config. %% RPC call from Management API or CLI. %% the avro binary and plugin config ALWAYS be valid before calling this function. -put_plugin_config(NameVsn, RawAvro, PluginConfig) -> - ok = write_avro_bin(NameVsn, RawAvro), - ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), PluginConfig), +put_plugin_config(NameVsn, AvroJsonMap, DecodedPluginConfig) -> + AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), + ok = write_avro_bin(NameVsn, AvroJsonBin), + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok. %% @doc Stop and then start the plugin. @@ -300,9 +301,11 @@ list() -> %%-------------------------------------------------------------------- %% Package utils --spec decode_plugin_avro_config(name_vsn(), binary()) -> {ok, map()} | {error, any()}. -decode_plugin_avro_config(NameVsn, RawAvro) -> - case emqx_plugins_serde:decode(NameVsn, RawAvro) of +-spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. +decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> + decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap)); +decode_plugin_avro_config(NameVsn, AvroJsonBin) -> + case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of {ok, Config} -> {ok, Config}; {error, ReasonMap} -> {error, ReasonMap} end. diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index 726d3c04b..b936020a6 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -102,14 +102,14 @@ delete_schema(NameVsn) -> -spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}. decode(SerdeName, RawData) -> with_serde( - "decode_avro_binary", + "decode_avro_json", eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData]) ). -spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}. encode(SerdeName, Data) -> with_serde( - "encode_avro_data", + "encode_avro_json", eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data]) ). @@ -252,10 +252,10 @@ eval_serde_fun(Op, ErrMsg, SerdeName, Args) -> end. eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> - Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), - {ok, avro_binary_decoder:decode(Data, Name, Store, Opts)}; + Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}, {encoding, avro_json}]), + {ok, avro_json_decoder:decode_value(Data, Name, Store, Opts)}; eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> - {ok, avro_binary_encoder:encode(Store, Name, Data)}; + {ok, avro_json_encoder:encode(Store, Name, Data)}; eval_serde(_, _, _) -> throw(#{error_msg => "unexpected_plugin_avro_op"}). From f343cd2021bd8e39b3f07f1dda06daea88e6f288 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 13:43:00 +0800 Subject: [PATCH 11/22] fix: get plugin config in json --- apps/emqx_management/src/emqx_mgmt_api_plugins.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index c98f1053e..0a575befd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -486,9 +486,9 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> return(204, Res). plugin_config(get, #{bindings := #{name := Name}}) -> - case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_AVRO}) of - {ok, AvroBin} -> - {200, #{<<"content-type">> => <<"application/octet-stream">>}, AvroBin}; + case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_MAP}) of + {ok, AvroJson} -> + {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; {error, _} -> {400, #{ code => 'BAD_CONFIG', From b0aa3bb70fe99f9bdd10f13a7a43f39f17aa2ab5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 13:49:54 +0800 Subject: [PATCH 12/22] fix(plugin): get plugin config api --- apps/emqx_plugins/src/emqx_plugins.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 0d0dc1fae..7a73cc72b 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -51,7 +51,7 @@ ensure_stopped/1, get_plugin_config/1, get_plugin_config/2, - put_plugin_config/4, + put_plugin_config/3, restart/1, list/0 ]). @@ -249,7 +249,7 @@ ensure_stopped(NameVsn) -> -spec get_plugin_config(name_vsn()) -> {ok, plugin_config()} | {error, term()}. get_plugin_config(NameVsn) -> - get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}). + get_plugin_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). -spec get_plugin_config(name_vsn(), Options :: map()) -> {ok, avro_binary() | plugin_config()} @@ -260,12 +260,12 @@ get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> {error, _Reason} = Err -> Err end; get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> - persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{}). + {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{})}. %% @doc Update plugin's config. %% RPC call from Management API or CLI. %% the avro binary and plugin config ALWAYS be valid before calling this function. -put_plugin_config(NameVsn, AvroJsonMap, DecodedPluginConfig) -> +put_plugin_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = write_avro_bin(NameVsn, AvroJsonBin), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), From 1869f6fd0a7213ec4a30634da587705f3dd84373 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 14:32:32 +0800 Subject: [PATCH 13/22] fix: enusre plugin installed when do config operation --- .../src/emqx_mgmt_api_plugins.erl | 71 +++++++++++-------- .../proto/emqx_mgmt_api_plugins_proto_v3.erl | 4 +- 2 files changed, 44 insertions(+), 31 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 0a575befd..d308cb42c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -200,7 +200,7 @@ schema("/plugins/:name/config") -> responses => #{ 204 => <<"Config updated successfully">>, 400 => emqx_dashboard_swagger:error_codes( - ['UNEXPECTED_ERROR'], <<"Update plugin config failed">> + ['BAD_CONFIG', 'UNEXPECTED_ERROR'], <<"Update plugin config failed">> ), 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } @@ -485,31 +485,41 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), return(204, Res). -plugin_config(get, #{bindings := #{name := Name}}) -> - case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_MAP}) of - {ok, AvroJson} -> - {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; - {error, _} -> - {400, #{ - code => 'BAD_CONFIG', - message => <<"Failed to get plugin config">> - }} +plugin_config(get, #{bindings := #{name := NameVsn}}) -> + case emqx_plugins:describe(NameVsn) of + {ok, _} -> + case emqx_plugins:get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}) of + {ok, AvroJson} -> + {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; + {error, _} -> + {400, #{ + code => 'BAD_CONFIG', + message => <<"Failed to get plugin config">> + }} + end; + _ -> + {404, plugin_not_found_msg()} end; -plugin_config(put, #{bindings := #{name := Name}, body := AvroJsonMap}) -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - case emqx_plugins:decode_plugin_avro_config(Name, AvroJsonBin) of - {ok, AvroValueConfig} -> - Nodes = emqx:running_nodes(), - %% cluster call with config in map (binary key-value) - _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( - Nodes, Name, AvroJsonMap, AvroValueConfig - ), - {204}; - {error, Reason} -> - {400, #{ - code => 'BAD_CONFIG', - message => readable_error_msg(Reason) - }} +plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> + case emqx_plugins:describe(NameVsn) of + {ok, _} -> + AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), + case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonBin) of + {ok, AvroValueConfig} -> + Nodes = emqx:running_nodes(), + %% cluster call with config in map (binary key-value) + _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( + Nodes, NameVsn, AvroJsonMap, AvroValueConfig + ), + {204}; + {error, Reason} -> + {400, #{ + code => 'BAD_CONFIG', + message => readable_error_msg(Reason) + }} + end; + _ -> + {404, plugin_not_found_msg()} end. plugin_schema(get, #{bindings := #{name := NameVsn}}) -> @@ -517,10 +527,7 @@ plugin_schema(get, #{bindings := #{name := NameVsn}}) -> {ok, _Plugin} -> {200, format_plugin_avsc_and_i18n(NameVsn)}; _ -> - {404, #{ - code => 'NOT_FOUND', - message => <<"Plugin Not Found">> - }} + {404, plugin_not_found_msg()} end. update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> @@ -610,6 +617,12 @@ return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = return(_, {error, Reason}) -> {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}. +plugin_not_found_msg() -> + #{ + code => 'NOT_FOUND', + message => <<"Plugin Not Found">> + }. + readable_error_msg(Msg) -> emqx_utils:readable_error_msg(Msg). diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl index b428a38cc..641d35f70 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl @@ -59,11 +59,11 @@ ensure_action(Name, Action) -> map() ) -> emqx_rpc:multicall_result(). -update_plugin_config(Nodes, Name, AvroJsonMap, PluginConfig) -> +update_plugin_config(Nodes, NameVsn, AvroJsonMap, PluginConfig) -> rpc:multicall( Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, - [Name, AvroJsonMap, PluginConfig], + [NameVsn, AvroJsonMap, PluginConfig], 10000 ). From c884dfb4515d3915e177b5b9e0f958159c002c35 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 16:42:38 +0800 Subject: [PATCH 14/22] test: make eunit happy --- apps/emqx_plugins/test/emqx_plugins_tests.erl | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index fb4277c4f..4911c174e 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -16,6 +16,7 @@ -module(emqx_plugins_tests). +-include("emqx_plugins.hrl"). -include_lib("eunit/include/eunit.hrl"). -compile(nowarn_export_all). @@ -28,20 +29,20 @@ ensure_configured_test_todo() -> after emqx_plugins:put_configured([]) end, - meck:unload(emqx). + unmeck_emqx(). test_ensure_configured() -> ok = emqx_plugins:put_configured([]), P1 = #{name_vsn => "p-1", enable => true}, P2 = #{name_vsn => "p-2", enable => true}, P3 = #{name_vsn => "p-3", enable => false}, - emqx_plugins:ensure_configured(P1, front), - emqx_plugins:ensure_configured(P2, {before, <<"p-1">>}), - emqx_plugins:ensure_configured(P3, {before, <<"p-1">>}), + emqx_plugins:ensure_configured(P1, front, local), + emqx_plugins:ensure_configured(P2, {before, <<"p-1">>}, local), + emqx_plugins:ensure_configured(P3, {before, <<"p-1">>}, local), ?assertEqual([P2, P3, P1], emqx_plugins:configured()), ?assertThrow( #{error := "position_anchor_plugin_not_configured"}, - emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>}) + emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>}, local) ). read_plugin_test() -> @@ -64,7 +65,7 @@ read_plugin_test() -> end end ), - meck:unload(emqx). + unmeck_emqx(). with_rand_install_dir(F) -> N = rand:uniform(10000000), @@ -100,7 +101,7 @@ delete_package_test() -> ?assertMatch({error, _}, emqx_plugins:delete_package("a-1")) end ), - meck:unload(emqx). + unmeck_emqx(). %% purge plugin's install dir should mostly work and return ok %% but it may fail in case the dir is read-only @@ -120,10 +121,11 @@ purge_test() -> ?assertEqual(ok, emqx_plugins:purge("a-1")) end ), - meck:unload(emqx). + unmeck_emqx(). meck_emqx() -> meck:new(emqx, [unstick, passthrough]), + meck:new(emqx_plugins_serde), meck:expect( emqx, update_config, @@ -131,4 +133,14 @@ meck_emqx() -> emqx_config:put(Path, Values) end ), + meck:expect( + emqx_plugins_serde, + delete_schema, + fun(_NameVsn) -> ok end + ), + ok. + +unmeck_emqx() -> + meck:unload(emqx), + meck:unload(emqx_plugins_serde), ok. From 670ddae57c9fed72b12c92be4eb7327146d2bdcb Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 16:46:14 +0800 Subject: [PATCH 15/22] chore: fix typo --- apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl | 2 +- apps/emqx_management/src/emqx_mgmt_api_plugins.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl index 33c4059df..eaa550d64 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl @@ -72,7 +72,7 @@ dashboard_addr(desc) -> ?DESC(dashboard_addr); dashboard_addr(default) -> <<"https://127.0.0.1:18083">>; dashboard_addr(_) -> undefined. -%% TOOD: support raw xml metadata in hocon (maybe?🤔) +%% TODO: support raw xml metadata in hocon (maybe?🤔) idp_metadata_url(type) -> binary(); idp_metadata_url(desc) -> ?DESC(idp_metadata_url); idp_metadata_url(default) -> <<"https://idp.example.com">>; diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index d308cb42c..9cc518dd0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -601,7 +601,7 @@ ensure_action(Name, restart) -> %% for RPC plugin avro encoded config update do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) -> - %% TOOD: maybe use `PluginConfigMap` to validate config + %% TODO: maybe use `PluginConfigMap` to validate config emqx_plugins:put_plugin_config(Name, AvroJsonMap, PluginConfigMap). %%-------------------------------------------------------------------- From e5bd747b32e4d8f08c0504e3147dd1e6d4d11c93 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Apr 2024 17:01:16 +0800 Subject: [PATCH 16/22] refactor: read avsc file when make serde --- apps/emqx_plugins/src/emqx_plugins.erl | 23 +++------ apps/emqx_plugins/src/emqx_plugins_serde.erl | 52 +++++++++----------- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 7a73cc72b..c4ef79e17 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -1012,22 +1012,15 @@ maybe_post_op_after_install(NameVsn) -> ok. maybe_load_config_schema(NameVsn) -> - filelib:is_regular(avsc_file_path(NameVsn)) andalso - do_load_config_schema(NameVsn). + AvscPath = avsc_file_path(NameVsn), + filelib:is_regular(AvscPath) andalso + do_load_config_schema(NameVsn, AvscPath). -do_load_config_schema(NameVsn) -> - case read_plugin_avsc(NameVsn, #{read_mode => ?RAW_BIN}) of - {ok, AvscBin} -> - case emqx_plugins_serde:add_schema(NameVsn, AvscBin) of - ok -> ok; - {error, already_exists} -> ok; - {error, _Reason} -> ok - end; - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_read_plugin_avsc", reason => Reason, name_vsn => NameVsn - }), - ok +do_load_config_schema(NameVsn, AvscPath) -> + case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of + ok -> ok; + {error, already_exists} -> ok; + {error, _Reason} -> ok end. maybe_create_config_dir(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index b936020a6..00fd04b63 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -59,20 +59,20 @@ lookup_serde(SchemaName) -> end. -spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. -add_schema(Name, Avsc) -> - case lookup_serde(Name) of +add_schema(NameVsn, Path) -> + case lookup_serde(NameVsn) of {ok, _Serde} -> - ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => Name}), + ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}), {error, already_exists}; {error, not_found} -> - case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}, infinity) of + case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of ok -> - ?SLOG(debug, #{msg => "plugin_schema_added", plugin => Name}), + ?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}), ok; {error, Reason} = E -> ?SLOG(error, #{ msg => "plugin_schema_add_failed", - plugin => Name, + plugin => NameVsn, reason => emqx_utils:readable_error_msg(Reason) }), E @@ -123,15 +123,15 @@ init(_) -> public, ordered_set, {keypos, #plugin_schema_serde.name} ]), State = #{}, - SchemasMap = read_plugin_avsc(), - {ok, State, {continue, {build_serdes, SchemasMap}}}. + AvscPaths = get_plugin_avscs(), + {ok, State, {continue, {build_serdes, AvscPaths}}}. -handle_continue({build_serdes, SchemasMap}, State) -> - _ = build_serdes(SchemasMap), +handle_continue({build_serdes, AvscPaths}, State) -> + _ = build_serdes(AvscPaths), {noreply, State}. -handle_call({build_serdes, NameVsn, Avsc}, _From, State) -> - BuildRes = do_build_serde(NameVsn, Avsc), +handle_call({build_serdes, NameVsn, AvscPath}, _From, State) -> + BuildRes = do_build_serde({NameVsn, AvscPath}), {reply, BuildRes, State}; handle_call(_Call, _From, State) -> {reply, {error, unknown_call}, State}. @@ -149,29 +149,24 @@ terminate(_Reason, _State) -> %% Internal fns %%------------------------------------------------------------------------------------------------- -read_plugin_avsc() -> +-spec get_plugin_avscs() -> [{string(), string()}]. +get_plugin_avscs() -> Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), lists:foldl( fun(AvscPath, AccIn) -> - case read_avsc_file(AvscPath) of - {ok, Avsc} -> - [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), - AccIn#{to_bin(NameVsn) => Avsc}; - {error, Reason} -> - ?SLOG(warning, Reason), - AccIn - end + [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + [{NameVsn, AvscPath} | AccIn] end, - _Acc0 = #{}, + _Acc0 = [], filelib:wildcard(Pattern) ). -build_serdes(Schemas) -> - maps:foreach(fun do_build_serde/2, Schemas). +build_serdes(AvscPaths) -> + ok = lists:foreach(fun do_build_serde/1, AvscPaths). -do_build_serde(NameVsn, Avsc) -> +do_build_serde({NameVsn, AvscPath}) -> try - Serde = make_serde(NameVsn, Avsc), + Serde = make_serde(NameVsn, AvscPath), true = ets:insert(?PLUGIN_SERDE_TAB, Serde), ok catch @@ -189,11 +184,12 @@ do_build_serde(NameVsn, Avsc) -> {error, Error} end. -make_serde(NameVsn, Avsc) -> +make_serde(NameVsn, AvscPath) -> + {ok, AvscBin} = read_avsc_file(AvscPath), Store0 = avro_schema_store:new([map]), %% import the schema into the map store with an assigned name %% if it's a named schema (e.g. struct), then Name is added as alias - Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0), + Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0), #plugin_schema_serde{ name = NameVsn, eval_context = Store From 28e8131984b32becc0eeabec70597afb3bb31b51 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 10:37:15 +0800 Subject: [PATCH 17/22] refactor: avoid make when do serde --- apps/emqx_plugins/src/emqx_plugins_serde.erl | 44 ++++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index 00fd04b63..fc1321ff1 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -102,15 +102,17 @@ delete_schema(NameVsn) -> -spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}. decode(SerdeName, RawData) -> with_serde( - "decode_avro_json", - eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData]) + ?FUNCTION_NAME, + SerdeName, + [RawData] ). -spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}. encode(SerdeName, Data) -> with_serde( - "encode_avro_json", - eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data]) + ?FUNCTION_NAME, + SerdeName, + [Data] ). %%------------------------------------------------------------------------------------------------- @@ -209,9 +211,11 @@ ensure_serde_absent(Name) -> async_delete_serdes(Names) -> gen_server:cast(?MODULE, {delete_serdes, Names}). -with_serde(WhichOp, Fun) -> +with_serde(Op, SerdeName, Args) -> + WhichOp = which_op(Op), + ErrMsg = error_msg(Op), try - Fun() + eval_serde(Op, ErrMsg, SerdeName, Args) catch throw:Reason -> ?SLOG(error, Reason#{ @@ -233,18 +237,16 @@ with_serde(WhichOp, Fun) -> }} end. -eval_serde_fun(Op, ErrMsg, SerdeName, Args) -> - fun() -> - case lookup_serde(SerdeName) of - {ok, Serde} -> - eval_serde(Op, Serde, Args); - {error, not_found} -> - throw(#{ - error_msg => ErrMsg, - reason => plugin_serde_not_found, - serde_name => SerdeName - }) - end +eval_serde(Op, ErrMsg, SerdeName, Args) -> + case lookup_serde(SerdeName) of + {ok, Serde} -> + eval_serde(Op, Serde, Args); + {error, not_found} -> + throw(#{ + error_msg => ErrMsg, + reason => plugin_serde_not_found, + serde_name => SerdeName + }) end. eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> @@ -255,6 +257,12 @@ eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Dat eval_serde(_, _, _) -> throw(#{error_msg => "unexpected_plugin_avro_op"}). +which_op(Op) -> + atom_to_list(Op) ++ "_avro_json". + +error_msg(Op) -> + atom_to_list(Op) ++ "_avro_data". + read_avsc_file(Path) -> case file:read_file(Path) of {ok, Bin} -> From 11389bc086086359a2897234398c6235c0dd05a5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 11:12:24 +0800 Subject: [PATCH 18/22] fix: i18n file renamed --- apps/emqx_plugins/src/emqx_plugins.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index c4ef79e17..87932b2b7 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -1082,7 +1082,7 @@ avro_config_file(NameVsn) -> filename:join([plugin_config_dir(NameVsn), "config.avro"]). i18n_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "i18n.json"]). + filename:join([plugin_dir(NameVsn), "config_i18n.json"]). readme_file(NameVsn) -> filename:join([plugin_dir(NameVsn), "README.md"]). From 00cab33fde233e3fc305bccade96c273060b7ea8 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 11:49:12 +0800 Subject: [PATCH 19/22] fix: plugin's internal config api --- .../src/emqx_mgmt_api_plugins.erl | 5 +-- apps/emqx_plugins/src/emqx_plugins.erl | 38 ++++++++++++++----- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 9cc518dd0..375d3d2dd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -488,7 +488,7 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> plugin_config(get, #{bindings := #{name := NameVsn}}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}) of + case emqx_plugins:get_plugin_config(NameVsn) of {ok, AvroJson} -> {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; {error, _} -> @@ -503,8 +503,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) -> plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonBin) of + case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of {ok, AvroValueConfig} -> Nodes = emqx:running_nodes(), %% cluster call with config in map (binary key-value) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 87932b2b7..065ab701b 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -28,7 +28,8 @@ plugin_avsc/1, plugin_i18n/1, plugin_avro/1, - parse_name_vsn/1 + parse_name_vsn/1, + make_name_vsn_string/2 ]). %% Package operations @@ -49,19 +50,22 @@ ensure_started/1, ensure_stopped/0, ensure_stopped/1, - get_plugin_config/1, - get_plugin_config/2, - put_plugin_config/3, restart/1, list/0 ]). +%% Plugin config APIs +-export([ + get_plugin_config/1, + get_plugin_config/2, + get_plugin_config/3, + get_plugin_config/4, + put_plugin_config/3 +]). + %% Package utils -export([ decode_plugin_avro_config/2, - get_config/2, - put_config/2, - get_tar/1, install_dir/0, avsc_file_path/1 ]). @@ -73,6 +77,8 @@ %% Internal export -export([do_ensure_started/1]). +%% for test cases +-export([put_config/2]). -ifdef(TEST). -compile(export_all). @@ -124,6 +130,9 @@ parse_name_vsn(NameVsn) when is_list(NameVsn) -> _ -> {error, "bad_name_vsn"} end. +make_name_vsn_string(Name, Vsn) -> + binary_to_list(iolist_to_binary([Name, "-", Vsn])). + %%-------------------------------------------------------------------- %% Package operations @@ -246,21 +255,30 @@ ensure_stopped(NameVsn) -> end ). +get_plugin_config(Name, Vsn, Options, Default) -> + get_plugin_config(make_name_vsn_string(Name, Vsn), Options, Default). + -spec get_plugin_config(name_vsn()) -> - {ok, plugin_config()} | {error, term()}. + {ok, plugin_config()} + | {error, term()}. get_plugin_config(NameVsn) -> get_plugin_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). -spec get_plugin_config(name_vsn(), Options :: map()) -> {ok, avro_binary() | plugin_config()} | {error, term()}. + get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> + %% no default value when get raw binary config case read_plugin_avro(NameVsn) of {ok, _AvroJson} = Res -> Res; {error, _Reason} = Err -> Err end; -get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> - {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{})}. +get_plugin_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> + get_plugin_config(NameVsn, Options, #{}). + +get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> + {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}. %% @doc Update plugin's config. %% RPC call from Management API or CLI. From 5ff4e7690494dac7c5d445065fd9bcd906a011a6 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 18:06:28 +0800 Subject: [PATCH 20/22] refactor: rename plugins config api functions --- .../src/emqx_mgmt_api_plugins.erl | 4 +- .../test/emqx_mgmt_api_plugins_SUITE.erl | 6 +- apps/emqx_plugins/src/emqx_plugins.erl | 61 +++++++++---------- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 10 +-- apps/emqx_plugins/test/emqx_plugins_tests.erl | 4 +- 5 files changed, 42 insertions(+), 43 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 375d3d2dd..94b16320a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -488,7 +488,7 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> plugin_config(get, #{bindings := #{name := NameVsn}}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:get_plugin_config(NameVsn) of + case emqx_plugins:get_config(NameVsn) of {ok, AvroJson} -> {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; {error, _} -> @@ -601,7 +601,7 @@ ensure_action(Name, restart) -> %% for RPC plugin avro encoded config update do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) -> %% TODO: maybe use `PluginConfigMap` to validate config - emqx_plugins:put_plugin_config(Name, AvroJsonMap, PluginConfigMap). + emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl index 4e5dacc7a..e563ba262 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl @@ -37,10 +37,10 @@ init_per_suite(Config) -> ok = filelib:ensure_dir(WorkDir), DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"), DemoShDir = lists:flatten(string:replace(DemoShDir1, "emqx_management", "emqx_plugins")), - OrigInstallDir = emqx_plugins:get_config(install_dir, undefined), + OrigInstallDir = emqx_plugins:get_config_interal(install_dir, undefined), ok = filelib:ensure_dir(DemoShDir), emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]), - emqx_plugins:put_config(install_dir, DemoShDir), + emqx_plugins:put_config_internal(install_dir, DemoShDir), [{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config]. end_per_suite(Config) -> @@ -48,7 +48,7 @@ end_per_suite(Config) -> %% restore config case proplists:get_value(orig_install_dir, Config) of undefined -> ok; - OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir) + OrigInstallDir -> emqx_plugins:put_config_internal(install_dir, OrigInstallDir) end, emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]), ok. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 065ab701b..11edac123 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -56,11 +56,11 @@ %% Plugin config APIs -export([ - get_plugin_config/1, - get_plugin_config/2, - get_plugin_config/3, - get_plugin_config/4, - put_plugin_config/3 + get_config/1, + get_config/2, + get_config/3, + get_config/4, + put_config/3 ]). %% Package utils @@ -78,7 +78,7 @@ %% Internal export -export([do_ensure_started/1]). %% for test cases --export([put_config/2]). +-export([put_config_internal/2]). -ifdef(TEST). -compile(export_all). @@ -255,35 +255,34 @@ ensure_stopped(NameVsn) -> end ). -get_plugin_config(Name, Vsn, Options, Default) -> - get_plugin_config(make_name_vsn_string(Name, Vsn), Options, Default). +get_config(Name, Vsn, Options, Default) -> + get_config(make_name_vsn_string(Name, Vsn), Options, Default). --spec get_plugin_config(name_vsn()) -> +-spec get_config(name_vsn()) -> {ok, plugin_config()} | {error, term()}. -get_plugin_config(NameVsn) -> - get_plugin_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). +get_config(NameVsn) -> + get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). --spec get_plugin_config(name_vsn(), Options :: map()) -> +-spec get_config(name_vsn(), Options :: map()) -> {ok, avro_binary() | plugin_config()} | {error, term()}. - -get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> +get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> %% no default value when get raw binary config case read_plugin_avro(NameVsn) of {ok, _AvroJson} = Res -> Res; {error, _Reason} = Err -> Err end; -get_plugin_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> - get_plugin_config(NameVsn, Options, #{}). +get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> + get_config(NameVsn, Options, #{}). -get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> +get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}. %% @doc Update plugin's config. %% RPC call from Management API or CLI. -%% the avro binary and plugin config ALWAYS be valid before calling this function. -put_plugin_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> +%% the avro Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = write_avro_bin(NameVsn, AvroJsonBin), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), @@ -328,13 +327,13 @@ decode_plugin_avro_config(NameVsn, AvroJsonBin) -> {error, ReasonMap} -> {error, ReasonMap} end. -get_config(Key, Default) when is_atom(Key) -> - get_config([Key], Default); -get_config(Path, Default) -> +get_config_interal(Key, Default) when is_atom(Key) -> + get_config_interal([Key], Default); +get_config_interal(Path, Default) -> emqx_conf:get([?CONF_ROOT | Path], Default). -put_config(Key, Value) -> - do_put_config(Key, Value, _ConfLocation = local). +put_config_internal(Key, Value) -> + do_put_config_internal(Key, Value, _ConfLocation = local). -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}. get_tar(NameVsn) -> @@ -950,16 +949,16 @@ is_needed_by(AppToStop, RunningApp) -> undefined -> false end. -do_put_config(Key, Value, ConfLocation) when is_atom(Key) -> - do_put_config([Key], Value, ConfLocation); -do_put_config(Path, Values, _ConfLocation = local) when is_list(Path) -> +do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) -> + do_put_config_internal([Key], Value, ConfLocation); +do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, %% Already in cluster_rpc, don't use emqx_conf:update, dead calls case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end; -do_put_config(Path, Values, _ConfLocation = global) when is_list(Path) -> +do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; @@ -995,16 +994,16 @@ enable_disable_plugin(_NameVsn, _Diff) -> %%-------------------------------------------------------------------- install_dir() -> - get_config(install_dir, ""). + get_config_interal(install_dir, ""). put_configured(Configured) -> put_configured(Configured, _ConfLocation = local). put_configured(Configured, ConfLocation) -> - ok = do_put_config(states, bin_key(Configured), ConfLocation). + ok = do_put_config_internal(states, bin_key(Configured), ConfLocation). configured() -> - get_config(states, []). + get_config_interal(states, []). for_plugins(ActionFun) -> case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index d7bdfad13..80f3d7a48 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -626,9 +626,9 @@ group_t_copy_plugin_to_a_new_node({init, Config}) -> } ), [CopyFromNode] = emqx_cth_cluster:start([SpecCopyFrom#{join_to => undefined}]), - ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]), + ok = rpc:call(CopyFromNode, emqx_plugins, put_config_internal, [install_dir, FromInstallDir]), [CopyToNode] = emqx_cth_cluster:start([SpecCopyTo#{join_to => undefined}]), - ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]), + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [install_dir, ToInstallDir]), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]), ok = rpc:call(CopyFromNode, emqx_plugins, ensure_started, [NameVsn]), @@ -658,7 +658,7 @@ group_t_copy_plugin_to_a_new_node(Config) -> CopyFromNode = proplists:get_value(copy_from_node, Config), CopyToNode = proplists:get_value(copy_to_node, Config), CopyToDir = proplists:get_value(to_install_dir, Config), - CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config, [[states], []]), + CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config_interal, [[states], []]), NameVsn = proplists:get_value(name_vsn, Config), PluginName = proplists:get_value(plugin_name, Config), PluginApp = list_to_atom(PluginName), @@ -681,7 +681,7 @@ group_t_copy_plugin_to_a_new_node(Config) -> ), ok = rpc:call(CopyToNode, ekka, join, [CopyFromNode]), %% Mimic cluster-override conf copying - ok = rpc:call(CopyToNode, emqx_plugins, put_config, [[states], CopyFromPluginsState]), + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]), %% Plugin copying is triggered upon app restart on a new node. %% This is similar to emqx_conf, which copies cluster-override conf upon start, %% see: emqx_conf_app:init_conf/0 @@ -734,7 +734,7 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) -> %% successfully even if it's not extracted yet. Simply starting %% the node would crash if not working properly. ct:pal("~p config:\n ~p", [ - CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config, [[], #{}]) + CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config_interal, [[], #{}]) ]), ct:pal("~p install_dir:\n ~p", [ CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir]) diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index 4911c174e..1ae0bcef3 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -72,12 +72,12 @@ with_rand_install_dir(F) -> TmpDir = integer_to_list(N), OriginalInstallDir = emqx_plugins:install_dir(), ok = filelib:ensure_dir(filename:join([TmpDir, "foo"])), - ok = emqx_plugins:put_config(install_dir, TmpDir), + ok = emqx_plugins:put_config_internal(install_dir, TmpDir), try F(TmpDir) after file:del_dir_r(TmpDir), - ok = emqx_plugins:put_config(install_dir, OriginalInstallDir) + ok = emqx_plugins:put_config_internal(install_dir, OriginalInstallDir) end. write_file(Path, Content) -> From 43ac4f5dfeb091ce63a323ad686eba26439127d3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 21:17:01 +0800 Subject: [PATCH 21/22] fix: make bpapi check happy --- apps/emqx_plugins/src/emqx_plugins.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 11edac123..67d25bf7a 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -75,6 +75,9 @@ post_config_update/5 ]). +%% RPC call +-export([get_tar/1]). + %% Internal export -export([do_ensure_started/1]). %% for test cases From b98f3d27b88dd64336f01d65e8c1737c5bdd7d40 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Apr 2024 22:30:08 +0800 Subject: [PATCH 22/22] docs: add change log entry for #12910 --- changes/feat-12910.en.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 changes/feat-12910.en.md diff --git a/changes/feat-12910.en.md b/changes/feat-12910.en.md new file mode 100644 index 000000000..d31fa7ef7 --- /dev/null +++ b/changes/feat-12910.en.md @@ -0,0 +1,15 @@ +Provided a configuration API endpoint for plugin functionality. +This allows users to describe the configuration struct of their plugins using AVRO schema. +During plugin runtime, the plugin's configuration can be accessed via the API. + +Added new API endpoints: +- `/plugins/:name/schema` + To get plugins avro schema and i18n config in one json object. +- `/plugins/:name/config` + To get or update plugin's own config + +Changed API endpoints: +- `/plugins/install` + Status code when succeeded change to `204`. It was `200` previously. +- `/plugins/:name/move` + Status code when succeeded change to `204`. It was `200` previously.