diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index a2913c37a..10d27fc63 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -46,6 +46,7 @@ {emqx_metrics,2}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. +{emqx_mgmt_api_plugins,3}. {emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,2}. {emqx_mgmt_cluster,3}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 49905540d..63a8ba517 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -19,7 +19,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -%%-include_lib("emqx_plugins/include/emqx_plugins.hrl"). +-include_lib("emqx_plugins/include/emqx_plugins.hrl"). -export([ api_spec/0, @@ -34,6 +34,8 @@ upload_install/2, plugin/2, update_plugin/2, + plugin_config/2, + plugin_schema/2, update_boot_order/2 ]). @@ -43,7 +45,8 @@ install_package/2, delete_package/1, describe_package/1, - ensure_action/2 + ensure_action/2, + do_update_plugin_config/3 ]). -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$"). @@ -52,7 +55,11 @@ %% app_name must be a snake_case (no '-' allowed). -define(VSN_WILDCARD, "-*.tar.gz"). -namespace() -> "plugins". +-define(CONTENT_PLUGIN, plugin). +-define(CONTENT_CONFIG, config). + +namespace() -> + "plugins". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -64,6 +71,8 @@ paths() -> "/plugins/:name", "/plugins/install", "/plugins/:name/:action", + "/plugins/:name/config", + "/plugins/:name/schema", "/plugins/:name/move" ]. @@ -97,10 +106,10 @@ schema("/plugins/install") -> schema => #{ type => object, properties => #{ - plugin => #{type => string, format => binary} + ?CONTENT_PLUGIN => #{type => string, format => binary} } }, - encoding => #{plugin => #{'contentType' => 'application/gzip'}} + encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}} } } }, @@ -157,6 +166,70 @@ schema("/plugins/:name/:action") -> } } }; +schema("/plugins/:name/config") -> + #{ + 'operationId' => plugin_config, + get => #{ + summary => + <<"Get plugin config">>, + description => + "Get plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + responses => #{ + %% binary avro encoded config + 200 => hoconsc:mk(binary()), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + }, + put => #{ + summary => + <<"Update plugin config">>, + description => + "Update plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.
", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + 'requestBody' => #{ + content => #{ + 'multipart/form-data' => #{ + schema => #{ + type => object, + properties => #{ + ?CONTENT_CONFIG => #{type => string, format => binary} + } + }, + encoding => #{?CONTENT_CONFIG => #{'contentType' => 'application/gzip'}} + } + } + }, + responses => #{ + 204 => <<"Config updated successfully">>, + 400 => emqx_dashboard_swagger:error_codes( + ['UNEXPECTED_ERROR'], <<"Update plugin config failed">> + ), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + } + }; +schema("/plugins/:name/schema") -> + #{ + 'operationId' => plugin_schema, + get => #{ + summary => <<"Get installed plugin's avro schema">>, + description => + "Get plugin's config avro schema.", + tags => ?TAGS, + parameters => [hoconsc:ref(name)], + responses => #{ + %% avro schema and i18n json object + 200 => hoconsc:mk(binary()), + 404 => emqx_dashboard_swagger:error_codes( + ['NOT_FOUND', 'FILE_NOT_EXISTED'], + <<"Plugin Not Found or Plugin not given a schema file">> + ) + } + } + }; schema("/plugins/:name/move") -> #{ 'operationId' => update_boot_order, @@ -338,7 +411,7 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) - %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall NameVsn = string:trim(FileName, trailing, ".tar.gz"), case emqx_plugins:describe(NameVsn) of - {error, #{error := "bad_info_file", return := {enoent, _}}} -> + {error, #{error_msg := "bad_info_file", reason := {enoent, _}}} -> case emqx_plugins:parse_name_vsn(FileName) of {ok, AppName, _Vsn} -> AppDir = filename:join(emqx_plugins:install_dir(), AppName), @@ -394,7 +467,7 @@ do_install_package(FileName, Bin) -> ), Reason = case hd(Filtered) of - {error, #{error := Reason0}} -> Reason0; + {error, #{error_msg := Reason0}} -> Reason0; {error, #{reason := Reason0}} -> Reason0 end, {400, #{ @@ -418,6 +491,42 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), return(204, Res). +plugin_config(get, #{bindings := #{name := Name}}) -> + case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_AVRO}) of + {ok, AvroBin} -> + {200, #{<<"content-type">> => <<"application/octet-stream">>}, AvroBin}; + {error, _} -> + {400, #{ + code => 'BAD_CONFIG', + message => <<"Failed to get plugin config">> + }} + end; +plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawAvro}}) -> + case emqx_plugins:decode_plugin_avro_config(Name, RawAvro) of + {ok, Config} -> + Nodes = emqx:running_nodes(), + _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( + Nodes, Name, RawAvro, Config + ), + {204}; + {error, Reason} -> + {400, #{ + code => 'BAD_CONFIG', + message => readable_error_msg(Reason) + }} + end. + +plugin_schema(get, #{bindings := #{name := NameVsn}}) -> + case emqx_plugins:describe(NameVsn) of + {ok, _Plugin} -> + {200, format_plugin_schema_with_i18n(NameVsn)}; + _ -> + {404, #{ + code => 'NOT_FOUND', + message => <<"Plugin Not Found">> + }} + end. + update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> case parse_position(Body, Name) of {error, Reason} -> @@ -429,7 +538,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> {error, Reason} -> {400, #{ code => 'MOVE_FAILED', - message => iolist_to_binary(io_lib:format("~p", [Reason])) + message => readable_error_msg(Reason) }} end end. @@ -443,7 +552,7 @@ install_package(FileName, Bin) -> ok = file:write_file(File, Bin), PackageName = string:trim(FileName, trailing, ".tar.gz"), case emqx_plugins:ensure_installed(PackageName) of - {error, #{return := not_found}} = NotFound -> + {error, #{reason := not_found}} = NotFound -> NotFound; {error, Reason} = Error -> ?SLOG(error, Reason#{msg => "failed_to_install_plugin"}), @@ -454,9 +563,9 @@ install_package(FileName, Bin) -> end. %% For RPC plugin get -describe_package(Name) -> +describe_package(NameVsn) -> Node = node(), - case emqx_plugins:describe(Name) of + case emqx_plugins:describe(NameVsn) of {ok, Plugin} -> {Node, [Plugin]}; _ -> {Node, []} end. @@ -487,12 +596,25 @@ ensure_action(Name, restart) -> _ = emqx_plugins:restart(Name), ok. +%% for RPC plugin avro encoded config update +do_update_plugin_config(Name, Avro, PluginConfig) -> + emqx_plugins:put_plugin_config(Name, Avro, PluginConfig). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + return(Code, ok) -> {Code}; -return(_, {error, #{error := "bad_info_file", return := {enoent, _} = Reason}}) -> - {404, #{code => 'NOT_FOUND', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}; +return(_, {error, #{error_msg := "bad_info_file", reason := {enoent, _} = Reason}}) -> + {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}}; +return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = Reason}}) -> + {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}}; return(_, {error, Reason}) -> - {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. + {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}. + +readable_error_msg(Msg) -> + emqx_utils:readable_error_msg(Msg). parse_position(#{<<"position">> := <<"front">>}, _) -> front; @@ -563,6 +685,18 @@ aggregate_status([{Node, Plugins} | List], Acc) -> ), aggregate_status(List, NewAcc). +format_plugin_schema_with_i18n(NameVsn) -> + #{ + avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + }. + +try_read_file(Fun) -> + case Fun() of + {ok, Bin} -> Bin; + _ -> null + end. + % running_status: running loaded, stopped %% config_status: not_configured disable enable plugin_status(#{running_status := running}) -> running; diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl index 781096af0..91d4674f9 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl @@ -24,6 +24,7 @@ describe_package/2, delete_package/1, ensure_action/2 + %% plugin_config/2 ]). -include_lib("emqx/include/bpapi.hrl"). diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl new file mode 100644 index 000000000..13c13bae7 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_plugins/1, + install_package/3, + describe_package/2, + delete_package/1, + ensure_action/2, + update_plugin_config/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec get_plugins([node()]) -> emqx_rpc:multicall_result(). +get_plugins(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000). + +-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result(). +install_package(Nodes, Filename, Bin) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000). + +-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result(). +describe_package(Nodes, Name) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000). + +-spec delete_package(binary() | string()) -> ok | {error, any()}. +delete_package(Name) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). + +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}. +ensure_action(Name, Action) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). + +-spec update_plugin_config( + [node()], + binary() | string(), + binary(), + map() +) -> + emqx_rpc:multicall_result(). +update_plugin_config(Nodes, Name, RawAvro, PluginConfig) -> + rpc:multicall( + Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, [Name, RawAvro, PluginConfig], 10000 + ). diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index 05959e46f..95dc50e4f 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -19,4 +19,25 @@ -define(CONF_ROOT, plugins). +-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab). + +-define(CONFIG_FORMAT_AVRO, config_format_avro). +-define(CONFIG_FORMAT_MAP, config_format_map). + +-type schema_name() :: binary(). +-type avsc() :: binary(). + +-type encoded_data() :: iodata(). +-type decoded_data() :: map(). + +-record(plugin_schema_serde, { + name :: schema_name(), + eval_context :: term(), + %% TODO: fields to mark schema import status + %% scheam_imported :: boolean(), + %% for future use + extra = [] +}). +-type plugin_schema_serde() :: #plugin_schema_serde{}. + -endif. diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index b26836475..0e0945c7f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.1.8"}, + {vsn, "0.2.0"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index ebe5b7932..ce66fc94d 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -16,7 +16,6 @@ -module(emqx_plugins). --include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl"). -include("emqx_plugins.hrl"). @@ -24,6 +23,15 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +-export([ + describe/1, + plugin_avsc/1, + plugin_i18n/1, + plugin_avro/1, + parse_name_vsn/1 +]). + +%% Package operations -export([ ensure_installed/1, ensure_uninstalled/1, @@ -35,21 +43,26 @@ delete_package/1 ]). +%% Plugin runtime management -export([ ensure_started/0, ensure_started/1, ensure_stopped/0, ensure_stopped/1, + get_plugin_config/1, + get_plugin_config/2, + put_plugin_config/3, restart/1, - list/0, - describe/1, - parse_name_vsn/1 + list/0 ]). +%% Package utils -export([ + decode_plugin_avro_config/2, get_config/2, put_config/2, - get_tar/1 + get_tar/1, + install_dir/0 ]). %% `emqx_config_handler' API @@ -57,21 +70,26 @@ post_config_update/5 ]). -%% internal +%% Internal export -export([do_ensure_started/1]). --export([ - install_dir/0 -]). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. +%% Defines +-define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}). + +%% Types %% "my_plugin-0.1.0" -type name_vsn() :: binary() | string(). %% the parse result of the JSON info file -type plugin() :: map(). +-type schema_json() :: map(). +-type i18n_json() :: map(). +-type avro_binary() :: binary(). +-type plugin_config() :: map(). -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. %%-------------------------------------------------------------------- @@ -80,12 +98,36 @@ %% @doc Describe a plugin. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. -describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}). +describe(NameVsn) -> + read_plugin_info(NameVsn, #{fill_readme => true}). + +-spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}. +plugin_avsc(NameVsn) -> + read_plugin_avsc(NameVsn). + +-spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}. +plugin_i18n(NameVsn) -> + read_plugin_i18n(NameVsn). + +-spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}. +plugin_avro(NameVsn) -> + read_plugin_avro(NameVsn). + +parse_name_vsn(NameVsn) when is_binary(NameVsn) -> + parse_name_vsn(binary_to_list(NameVsn)); +parse_name_vsn(NameVsn) when is_list(NameVsn) -> + case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of + {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn}; + _ -> {error, "bad_name_vsn"} + end. + +%%-------------------------------------------------------------------- +%% Package operations %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, map()}. ensure_installed(NameVsn) -> - case read_plugin(NameVsn, #{}) of + case read_plugin_info(NameVsn, #{}) of {ok, _} -> ok; {error, _} -> @@ -93,33 +135,183 @@ ensure_installed(NameVsn) -> do_ensure_installed(NameVsn) end. -do_ensure_installed(NameVsn) -> - TarGz = pkg_file(NameVsn), - case erl_tar:extract(TarGz, [compressed, memory]) of - {ok, TarContent} -> - ok = write_tar_file_content(install_dir(), TarContent), - case read_plugin(NameVsn, #{}) of - {ok, _} -> - ok; - {error, Reason} -> - ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), - ok = delete_tar_file_content(install_dir(), TarContent), - {error, Reason} - end; - {error, {_, enoent}} -> +%% @doc Ensure files and directories for the given plugin are being deleted. +%% If a plugin is running, or enabled, an error is returned. +-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. +ensure_uninstalled(NameVsn) -> + case read_plugin_info(NameVsn, #{}) of + {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> {error, #{ - reason => "failed_to_extract_plugin_package", - path => TarGz, - return => not_found + error_msg => "bad_plugin_running_status", + hint => "stop_the_plugin_first" }}; - {error, Reason} -> + {ok, #{config_status := enabled}} -> {error, #{ - reason => "bad_plugin_package", - path => TarGz, - return => Reason - }} + error_msg => "bad_plugin_config_status", + hint => "disable_the_plugin_first" + }}; + _ -> + purge(NameVsn), + ensure_delete(NameVsn) end. +%% @doc Ensure a plugin is enabled to the end of the plugins list. +-spec ensure_enabled(name_vsn()) -> ok | {error, any()}. +ensure_enabled(NameVsn) -> + ensure_enabled(NameVsn, no_move). + +%% @doc Ensure a plugin is enabled at the given position of the plugin list. +-spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}. +ensure_enabled(NameVsn, Position) -> + ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local). + +-spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}. +ensure_enabled(NameVsn, Position, ConfLocation) when + ConfLocation =:= local; ConfLocation =:= global +-> + ensure_state(NameVsn, Position, _Enabled = true, ConfLocation). + +%% @doc Ensure a plugin is disabled. +-spec ensure_disabled(name_vsn()) -> ok | {error, any()}. +ensure_disabled(NameVsn) -> + ensure_state(NameVsn, no_move, false, _ConfLocation = local). + +%% @doc Delete extracted dir +%% In case one lib is shared by multiple plugins. +%% it might be the case that purging one plugin's install dir +%% will cause deletion of loaded beams. +%% It should not be a problem, because shared lib should +%% reside in all the plugin install dirs. +-spec purge(name_vsn()) -> ok. +purge(NameVsn) -> + _ = maybe_purge_plugin_config(NameVsn), + purge_plugin(NameVsn). + +%% @doc Delete the package file. +-spec delete_package(name_vsn()) -> ok. +delete_package(NameVsn) -> + File = pkg_file(NameVsn), + _ = emqx_plugins_serde:delete_schema(NameVsn), + case file:delete(File) of + ok -> + ?SLOG(info, #{msg => "purged_plugin_dir", path => File}), + ok; + {error, enoent} -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_delete_package_file", + path => File, + reason => Reason + }), + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Plugin runtime management + +%% @doc Start all configured plugins are started. +-spec ensure_started() -> ok. +ensure_started() -> + ok = for_plugins(fun ?MODULE:do_ensure_started/1). + +%% @doc Start a plugin from Management API or CLI. +%% the input is a - string. +-spec ensure_started(name_vsn()) -> ok | {error, term()}. +ensure_started(NameVsn) -> + case do_ensure_started(NameVsn) of + ok -> + ok; + {error, Reason} -> + ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}), + {error, Reason} + end. + +%% @doc Stop all plugins before broker stops. +-spec ensure_stopped() -> ok. +ensure_stopped() -> + for_plugins(fun ?MODULE:ensure_stopped/1). + +%% @doc Stop a plugin from Management API or CLI. +-spec ensure_stopped(name_vsn()) -> ok | {error, term()}. +ensure_stopped(NameVsn) -> + tryit( + "stop_plugin", + fun() -> + Plugin = do_read_plugin(NameVsn), + ensure_apps_stopped(Plugin) + end + ). + +-spec get_plugin_config(name_vsn()) -> + {ok, plugin_config()} | {error, term()}. +get_plugin_config(NameVsn) -> + get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}). + +-spec get_plugin_config(name_vsn(), Options :: map()) -> + {ok, avro_binary() | plugin_config()} + | {error, term()}. +get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> + case read_plugin_avro(NameVsn) of + {ok, _AvroBin} = Res -> Res; + {error, _Reason} = Err -> Err + end; +get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> + persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{}). + +%% @doc Update plugin's config. +%% RPC call from Management API or CLI. +%% the avro binary and plugin config ALWAYS be valid before calling this function. +put_plugin_config(NameVsn, RawAvro, PluginConfig) -> + ok = write_avro_bin(NameVsn, RawAvro), + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), PluginConfig), + ok. + +%% @doc Stop and then start the plugin. +restart(NameVsn) -> + case ensure_stopped(NameVsn) of + ok -> ensure_started(NameVsn); + {error, Reason} -> {error, Reason} + end. + +%% @doc List all installed plugins. +%% Including the ones that are installed, but not enabled in config. +-spec list() -> [plugin()]. +list() -> + Pattern = filename:join([install_dir(), "*", "release.json"]), + All = lists:filtermap( + fun(JsonFilePath) -> + [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)), + case read_plugin_info(NameVsn, #{}) of + {ok, Info} -> + {true, Info}; + {error, Reason} -> + ?SLOG(warning, Reason), + false + end + end, + filelib:wildcard(Pattern) + ), + do_list(configured(), All). + +%%-------------------------------------------------------------------- +%% Package utils + +-spec decode_plugin_avro_config(name_vsn(), binary()) -> {ok, map()} | {error, any()}. +decode_plugin_avro_config(NameVsn, RawAvro) -> + case emqx_plugins_serde:decode(NameVsn, RawAvro) of + {ok, Config} -> {ok, Config}; + {error, ReasonMap} -> {error, ReasonMap} + end. + +get_config(Key, Default) when is_atom(Key) -> + get_config([Key], Default); +get_config(Path, Default) -> + emqx_conf:get([?CONF_ROOT | Path], Default). + +put_config(Key, Value) -> + do_put_config(Key, Value, _ConfLocation = local). + -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}. get_tar(NameVsn) -> TarGz = pkg_file(NameVsn), @@ -135,10 +327,14 @@ get_tar(NameVsn) -> end end. +%%-------------------------------------------------------------------- +%% Internal +%%-------------------------------------------------------------------- + maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) -> maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir)); maybe_create_tar(NameVsn, TarGzName, InstallDir) -> - case filelib:wildcard(filename:join(dir(NameVsn), "**")) of + case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of [_ | _] = PluginFiles -> InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/", PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles], @@ -207,24 +403,32 @@ top_dir_test_() -> ]. -endif. -%% @doc Ensure files and directories for the given plugin are being deleted. -%% If a plugin is running, or enabled, an error is returned. --spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. -ensure_uninstalled(NameVsn) -> - case read_plugin(NameVsn, #{}) of - {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> +do_ensure_installed(NameVsn) -> + TarGz = pkg_file(NameVsn), + case erl_tar:extract(TarGz, [compressed, memory]) of + {ok, TarContent} -> + ok = write_tar_file_content(install_dir(), TarContent), + case read_plugin_info(NameVsn, #{}) of + {ok, _} -> + ok = maybe_post_op_after_install(NameVsn), + ok; + {error, Reason} -> + ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), + ok = delete_tar_file_content(install_dir(), TarContent), + {error, Reason} + end; + {error, {_, enoent}} -> {error, #{ - reason => "bad_plugin_running_status", - hint => "stop_the_plugin_first" + error_msg => "failed_to_extract_plugin_package", + path => TarGz, + reason => not_found }}; - {ok, #{config_status := enabled}} -> + {error, Reason} -> {error, #{ - reason => "bad_plugin_config_status", - hint => "disable_the_plugin_first" - }}; - _ -> - purge(NameVsn), - ensure_delete(NameVsn) + error_msg => "bad_plugin_package", + path => TarGz, + reason => Reason + }} end. ensure_delete(NameVsn0) -> @@ -233,37 +437,19 @@ ensure_delete(NameVsn0) -> put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)), ok. -%% @doc Ensure a plugin is enabled to the end of the plugins list. --spec ensure_enabled(name_vsn()) -> ok | {error, any()}. -ensure_enabled(NameVsn) -> - ensure_enabled(NameVsn, no_move). - -%% @doc Ensure a plugin is enabled at the given position of the plugin list. --spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}. -ensure_enabled(NameVsn, Position) -> - ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local). - --spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}. -ensure_enabled(NameVsn, Position, ConfLocation) when - ConfLocation =:= local; ConfLocation =:= global --> - ensure_state(NameVsn, Position, _Enabled = true, ConfLocation). - -%% @doc Ensure a plugin is disabled. --spec ensure_disabled(name_vsn()) -> ok | {error, any()}. -ensure_disabled(NameVsn) -> - ensure_state(NameVsn, no_move, false, _ConfLocation = local). - ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) -> ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation); ensure_state(NameVsn, Position, State, ConfLocation) -> - case read_plugin(NameVsn, #{}) of + case read_plugin_info(NameVsn, #{}) of {ok, _} -> Item = #{ name_vsn => NameVsn, enable => State }, - tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end); + tryit( + "ensure_state", + fun() -> ensure_configured(Item, Position, ConfLocation) end + ); {error, Reason} -> {error, Reason} end. @@ -295,7 +481,7 @@ add_new_configured(Configured, {Action, NameVsn}, Item) -> {Front, Rear} = lists:splitwith(SplitFun, Configured), Rear =:= [] andalso throw(#{ - error => "position_anchor_plugin_not_configured", + error_msg => "position_anchor_plugin_not_configured", hint => "maybe_install_and_configure", name_vsn => NameVsn }), @@ -307,37 +493,21 @@ add_new_configured(Configured, {Action, NameVsn}, Item) -> Front ++ [Anchor, Item | Rear0] end. -%% @doc Delete the package file. --spec delete_package(name_vsn()) -> ok. -delete_package(NameVsn) -> - File = pkg_file(NameVsn), - case file:delete(File) of - ok -> - ?SLOG(info, #{msg => "purged_plugin_dir", path => File}), - ok; - {error, enoent} -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_delete_package_file", - path => File, - reason => Reason - }), - {error, Reason} - end. +maybe_purge_plugin_config(NameVsn) -> + _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)), + ok. -%% @doc Delete extracted dir -%% In case one lib is shared by multiple plugins. -%% it might be the case that purging one plugin's install dir -%% will cause deletion of loaded beams. -%% It should not be a problem, because shared lib should -%% reside in all the plugin install dirs. --spec purge(name_vsn()) -> ok. -purge(NameVsn) -> - Dir = dir(NameVsn), +purge_plugin(NameVsn) -> + Dir = plugin_dir(NameVsn), + purge_plugin_dir(Dir). + +purge_plugin_dir(Dir) -> case file:del_dir_r(Dir) of ok -> - ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir}); + ?SLOG(info, #{ + msg => "purged_plugin_dir", + dir => Dir + }); {error, enoent} -> ok; {error, Reason} -> @@ -349,72 +519,10 @@ purge(NameVsn) -> {error, Reason} end. -%% @doc Start all configured plugins are started. --spec ensure_started() -> ok. -ensure_started() -> - ok = for_plugins(fun ?MODULE:do_ensure_started/1). - -%% @doc Start a plugin from Management API or CLI. -%% the input is a - string. --spec ensure_started(name_vsn()) -> ok | {error, term()}. -ensure_started(NameVsn) -> - case do_ensure_started(NameVsn) of - ok -> - ok; - {error, Reason} -> - ?SLOG(alert, #{ - msg => "failed_to_start_plugin", - reason => Reason - }), - {error, Reason} - end. - -%% @doc Stop all plugins before broker stops. --spec ensure_stopped() -> ok. -ensure_stopped() -> - for_plugins(fun ?MODULE:ensure_stopped/1). - -%% @doc Stop a plugin from Management API or CLI. --spec ensure_stopped(name_vsn()) -> ok | {error, term()}. -ensure_stopped(NameVsn) -> - tryit( - "stop_plugin", - fun() -> - Plugin = do_read_plugin(NameVsn), - ensure_apps_stopped(Plugin) - end - ). - -%% @doc Stop and then start the plugin. -restart(NameVsn) -> - case ensure_stopped(NameVsn) of - ok -> ensure_started(NameVsn); - {error, Reason} -> {error, Reason} - end. - -%% @doc List all installed plugins. -%% Including the ones that are installed, but not enabled in config. --spec list() -> [plugin()]. -list() -> - Pattern = filename:join([install_dir(), "*", "release.json"]), - All = lists:filtermap( - fun(JsonFile) -> - case read_plugin({file, JsonFile}, #{}) of - {ok, Info} -> - {true, Info}; - {error, Reason} -> - ?SLOG(warning, Reason), - false - end - end, - filelib:wildcard(Pattern) - ), - list(configured(), All). - %% Make sure configured ones are ordered in front. -list([], All) -> +do_list([], All) -> All; -list([#{name_vsn := NameVsn} | Rest], All) -> +do_list([#{name_vsn := NameVsn} | Rest], All) -> SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) -> bin([Name, "-", Vsn]) =/= bin(NameVsn) end, @@ -424,9 +532,9 @@ list([#{name_vsn := NameVsn} | Rest], All) -> msg => "configured_plugin_not_installed", name_vsn => NameVsn }), - list(Rest, All); + do_list(Rest, All); {Front, [I | Rear]} -> - [I | list(Rest, Front ++ Rear)] + [I | do_list(Rest, Front ++ Rear)] end. do_ensure_started(NameVsn) -> @@ -439,23 +547,26 @@ do_ensure_started(NameVsn) -> ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ - msg => "plugin_not_found", + error_msg => "plugin_not_found", name_vsn => NameVsn - }) + }), + ok end end ). +%%-------------------------------------------------------------------- + %% try the function, catch 'throw' exceptions as normal 'error' return %% other exceptions with stacktrace logged. tryit(WhichOp, F) -> try F() catch - throw:Reason -> + throw:ReasonMap -> %% thrown exceptions are known errors %% translate to a return value without stacktrace - {error, Reason}; + {error, ReasonMap}; error:Reason:Stacktrace -> %% unexpected errors, log stacktrace ?SLOG(warning, #{ @@ -469,33 +580,44 @@ tryit(WhichOp, F) -> %% read plugin info from the JSON file %% returns {ok, Info} or {error, Reason} -read_plugin(NameVsn, Options) -> +read_plugin_info(NameVsn, Options) -> tryit( - "read_plugin_info", - fun() -> {ok, do_read_plugin(NameVsn, Options)} end + atom_to_list(?FUNCTION_NAME), + fun() -> {ok, do_read_plugin2(NameVsn, Options)} end ). -do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}). +do_read_plugin(NameVsn) -> + do_read_plugin2(NameVsn, #{}). -do_read_plugin({file, InfoFile}, Options) -> - [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)), - case hocon:load(InfoFile, #{format => richmap}) of - {ok, RichMap} -> - Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), - Info1 = plugins_readme(NameVsn, Options, Info0), - plugin_status(NameVsn, Info1); - {error, Reason} -> - throw(#{ - error => "bad_info_file", - path => InfoFile, - return => Reason - }) - end; -do_read_plugin(NameVsn, Options) -> - do_read_plugin({file, info_file(NameVsn)}, Options). +do_read_plugin2(NameVsn, Option) -> + do_read_plugin3(NameVsn, info_file(NameVsn), Option). + +do_read_plugin3(NameVsn, InfoFilePath, Options) -> + {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file"))(), + Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), + Info1 = plugins_readme(NameVsn, Options, Info0), + plugin_status(NameVsn, Info1). + +read_plugin_avsc(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(schema_file(NameVsn), "bad_avsc_file") + ). + +read_plugin_i18n(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(i18n_file(NameVsn), "bad_i18n_file") + ). + +read_plugin_avro(NameVsn) -> + tryit( + atom_to_list(?FUNCTION_NAME), + read_file_fun(schema_file(NameVsn), "bad_avro_file") + ). ensure_exists_and_installed(NameVsn) -> - case filelib:is_dir(dir(NameVsn)) of + case filelib:is_dir(plugin_dir(NameVsn)) of true -> ok; false -> @@ -581,10 +703,6 @@ plugin_status(NameVsn, Info) -> config_status => ConfSt }. -bin(A) when is_atom(A) -> atom_to_binary(A, utf8); -bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); -bin(B) when is_binary(B) -> B. - check_plugin( #{ <<"name">> := Name, @@ -593,7 +711,7 @@ check_plugin( <<"description">> := _ } = Info, NameVsn, - File + FilePath ) -> case bin(NameVsn) =:= bin([Name, "-", Vsn]) of true -> @@ -605,7 +723,7 @@ check_plugin( catch _:_ -> throw(#{ - error => "bad_rel_apps", + error_msg => "bad_rel_apps", rel_apps => Apps, hint => "A non-empty string list of app_name-app_vsn format" }) @@ -613,16 +731,16 @@ check_plugin( Info; false -> throw(#{ - error => "name_vsn_mismatch", + error_msg => "name_vsn_mismatch", name_vsn => NameVsn, - path => File, + path => FilePath, name => Name, rel_vsn => Vsn }) end; check_plugin(_What, NameVsn, File) -> throw(#{ - error => "bad_info_file_content", + error_msg => "bad_info_file_content", mandatory_fields => [rel_vsn, name, rel_apps, description], name_vsn => NameVsn, path => File @@ -678,7 +796,7 @@ do_load_plugin_app(AppName, Ebin) -> ok; {error, Reason} -> throw(#{ - error => "failed_to_load_plugin_beam", + error_msg => "failed_to_load_plugin_beam", path => BeamFile, reason => Reason }) @@ -693,7 +811,7 @@ do_load_plugin_app(AppName, Ebin) -> ok; {error, Reason} -> throw(#{ - error => "failed_to_load_plugin_app", + error_msg => "failed_to_load_plugin_app", name => AppName, reason => Reason }) @@ -710,7 +828,7 @@ start_app(App) -> ok; {error, {ErrApp, Reason}} -> throw(#{ - error => "failed_to_start_plugin_app", + error_msg => "failed_to_start_plugin_app", app => App, err_app => ErrApp, reason => Reason @@ -775,7 +893,7 @@ stop_app(App) -> ?SLOG(debug, #{msg => "plugin_not_started", app => App}), ok = unload_moudle_and_app(App); {error, Reason} -> - throw(#{error => "failed_to_stop_app", app => App, reason => Reason}) + throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason}) end. unload_moudle_and_app(App) -> @@ -802,94 +920,22 @@ is_needed_by(AppToStop, RunningApp) -> undefined -> false end. -put_config(Key, Value) -> - put_config(Key, Value, _ConfLocation = local). - -put_config(Key, Value, ConfLocation) when is_atom(Key) -> - put_config([Key], Value, ConfLocation); -put_config(Path, Values, _ConfLocation = local) when is_list(Path) -> +do_put_config(Key, Value, ConfLocation) when is_atom(Key) -> + do_put_config([Key], Value, ConfLocation); +do_put_config(Path, Values, _ConfLocation = local) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, %% Already in cluster_rpc, don't use emqx_conf:update, dead calls case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end; -put_config(Path, Values, _ConfLocation = global) when is_list(Path) -> +do_put_config(Path, Values, _ConfLocation = global) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end. -bin_key(Map) when is_map(Map) -> - maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map); -bin_key(List = [#{} | _]) -> - lists:map(fun(M) -> bin_key(M) end, List); -bin_key(Term) -> - Term. - -get_config(Key, Default) when is_atom(Key) -> - get_config([Key], Default); -get_config(Path, Default) -> - emqx_conf:get([?CONF_ROOT | Path], Default). - -install_dir() -> get_config(install_dir, ""). - -put_configured(Configured) -> - put_configured(Configured, _ConfLocation = local). - -put_configured(Configured, ConfLocation) -> - ok = put_config(states, bin_key(Configured), ConfLocation). - -configured() -> - get_config(states, []). - -for_plugins(ActionFun) -> - case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of - [] -> ok; - Errors -> erlang:error(#{function => ActionFun, errors => Errors}) - end. - -for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> - case Fun(NameVsn) of - ok -> []; - {error, Reason} -> [{NameVsn, Reason}] - end; -for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> - ?SLOG(debug, #{ - msg => "plugin_disabled", - name_vsn => NameVsn - }), - []. - -parse_name_vsn(NameVsn) when is_binary(NameVsn) -> - parse_name_vsn(binary_to_list(NameVsn)); -parse_name_vsn(NameVsn) when is_list(NameVsn) -> - case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of - {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn}; - _ -> {error, "bad_name_vsn"} - end. - -pkg_file(NameVsn) -> - filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). - -dir(NameVsn) -> - filename:join([install_dir(), NameVsn]). - -info_file(NameVsn) -> - filename:join([dir(NameVsn), "release.json"]). - -readme_file(NameVsn) -> - filename:join([dir(NameVsn), "README.md"]). - -running_apps() -> - lists:map( - fun({N, _, V}) -> - {N, V} - end, - application:which_applications(infinity) - ). - %%-------------------------------------------------------------------- %% `emqx_config_handler' API %%-------------------------------------------------------------------- @@ -913,3 +959,120 @@ enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) -> ok; enable_disable_plugin(_NameVsn, _Diff) -> ok. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +install_dir() -> + get_config(install_dir, ""). + +put_configured(Configured) -> + put_configured(Configured, _ConfLocation = local). + +put_configured(Configured, ConfLocation) -> + ok = do_put_config(states, bin_key(Configured), ConfLocation). + +configured() -> + get_config(states, []). + +for_plugins(ActionFun) -> + case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of + [] -> ok; + Errors -> erlang:error(#{function => ActionFun, errors => Errors}) + end. + +for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> + case Fun(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; +for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> + ?SLOG(debug, #{ + msg => "plugin_disabled", + name_vsn => NameVsn + }), + []. + +maybe_post_op_after_install(NameVsn) -> + _ = maybe_load_config_schema(NameVsn), + _ = maybe_create_config_dir(NameVsn), + ok. + +maybe_load_config_schema(NameVsn) -> + case read_plugin_avsc(NameVsn) of + {ok, Avsc} -> + case emqx_plugins_serde:add_schema(NameVsn, Avsc) of + ok -> ok; + {error, already_exists} -> ok; + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> + ?SLOG(warning, Reason) + end. + +maybe_create_config_dir(NameVsn) -> + case filelib:ensure_path(plugin_config_dir(NameVsn)) of + ok -> ok; + {error, Reason} -> ?SLOG(warning, Reason) + end. + +write_avro_bin(NameVsn, AvroBin) -> + ok = file:write_file(avro_config_file(NameVsn), AvroBin). + +read_file_fun(Path, ErrMsg) -> + fun() -> + case hocon:load(Path, #{format => richmap}) of + {ok, RichMap} -> + {ok, hocon_maps:ensure_plain(RichMap)}; + {error, Reason} -> + ErrMeta = #{error_msg => ErrMsg, reason => Reason}, + ?SLOG(warning, ErrMeta), + throw(ErrMeta) + end + end. + +%% Directorys +plugin_dir(NameVsn) -> + filename:join([install_dir(), NameVsn]). + +plugin_config_dir(NameVsn) -> + filename:join([plugin_dir(NameVsn), "data", "configs"]). + +%% Files +pkg_file(NameVsn) -> + filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). + +info_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "release.json"]). + +schema_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). + +avro_config_file(NameVsn) -> + filename:join([plugin_config_dir(NameVsn), "config.avro"]). + +i18n_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "i18n.json"]). + +readme_file(NameVsn) -> + filename:join([plugin_dir(NameVsn), "README.md"]). + +running_apps() -> + lists:map( + fun({N, _, V}) -> + {N, V} + end, + application:which_applications(infinity) + ). + +bin_key(Map) when is_map(Map) -> + maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map); +bin_key(List = [#{} | _]) -> + lists:map(fun(M) -> bin_key(M) end, List); +bin_key(Term) -> + Term. + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); +bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl new file mode 100644 index 000000000..a89f16e70 --- /dev/null +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -0,0 +1,274 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_serde). + +-include("emqx_plugins.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([ + start_link/0, + get_serde/1, + add_schema/2, + get_schema/1, + delete_schema/1 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_continue/2, + terminate/2 +]). + +-export([ + decode/2, + encode/2 +]). + +%%------------------------------------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec get_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}. +get_serde(SchemaName) -> + case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of + [] -> + {error, not_found}; + [Serde] -> + {ok, Serde} + end. + +-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. +add_schema(Name, Avsc) -> + case get_serde(Name) of + {ok, _Serde} -> + ?SLOG(warning, #{msg => "plugin_avsc_schema_already_exists", name_vsn => Name}), + {error, already_exists}; + {error, not_found} -> + case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}) of + ok -> + ?SLOG(debug, #{msg => "plugin_avsc_schema_added", name_vsn => Name}), + ok; + {error, Reason} = E -> + ?SLOG(error, #{ + msg => "plugin_avsc_schema_added_failed", + reason => emqx_utils:readable_error_msg(Reason) + }), + E + end + end. + +get_schema(NameVsn) -> + Path = emqx_plugins:schema_file(NameVsn), + case read_avsc_file(Path) of + {ok, Avsc} -> + {ok, Avsc}; + {error, Reason} -> + ?SLOG(warning, Reason), + {error, Reason} + end. + +-spec delete_schema(schema_name()) -> ok | {error, term()}. +delete_schema(NameVsn) -> + case get_serde(NameVsn) of + {ok, _Serde} -> + async_delete_serdes([NameVsn]), + ok; + {error, not_found} -> + {error, not_found} + end. + +-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}. +decode(SerdeName, RawData) -> + with_serde( + "decode_avro_binary", + eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData]) + ). + +-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}. +encode(SerdeName, Data) -> + with_serde( + "encode_avro_data", + eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data]) + ). + +%%------------------------------------------------------------------------------------------------- +%% `gen_server' API +%%------------------------------------------------------------------------------------------------- + +init(_) -> + process_flag(trap_exit, true), + ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [ + public, ordered_set, {keypos, #plugin_schema_serde.name} + ]), + State = #{}, + SchemasMap = read_plugin_avsc(), + {ok, State, {continue, {build_serdes, SchemasMap}}}. + +handle_continue({build_serdes, SchemasMap}, State) -> + _ = build_serdes(SchemasMap), + {noreply, State}. + +handle_call({build_serdes, {NameVsn, Avsc}}, _From, State) -> + BuildRes = do_build_serde(NameVsn, Avsc), + {reply, BuildRes, State}; +handle_call(_Call, _From, State) -> + {reply, {error, unknown_call}, State}. + +handle_cast({delete_serdes, Names}, State) -> + lists:foreach(fun ensure_serde_absent/1, Names), + {noreply, State}; +handle_cast(_Cast, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +read_plugin_avsc() -> + Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), + lists:foldl( + fun(AvscPath, AccIn) -> + case read_avsc_file(AvscPath) of + {ok, Avsc} -> + [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + AccIn#{to_bin(NameVsn) => Avsc}; + {error, Reason} -> + ?SLOG(warning, Reason), + AccIn + end + end, + _Acc0 = #{}, + filelib:wildcard(Pattern) + ). + +build_serdes(Schemas) -> + maps:foreach(fun do_build_serde/2, Schemas). + +do_build_serde(NameVsn, Avsc) -> + try + Serde = make_serde(NameVsn, Avsc), + true = ets:insert(?PLUGIN_SERDE_TAB, Serde), + ok + catch + Kind:Error:Stacktrace -> + ?SLOG( + error, + #{ + msg => "error_building_plugin_schema_serde", + name => NameVsn, + kind => Kind, + error => Error, + stacktrace => Stacktrace + } + ), + {error, Error} + end. + +make_serde(NameVsn, Avsc) -> + Store0 = avro_schema_store:new([map]), + %% import the schema into the map store with an assigned name + %% if it's a named schema (e.g. struct), then Name is added as alias + Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0), + #plugin_schema_serde{ + name = NameVsn, + eval_context = Store + }. + +ensure_serde_absent(Name) when not is_binary(Name) -> + ensure_serde_absent(to_bin(Name)); +ensure_serde_absent(Name) -> + case get_serde(Name) of + {ok, _Serde} -> + _ = ets:delete(?PLUGIN_SERDE_TAB, Name), + ok; + {error, not_found} -> + ok + end. + +async_delete_serdes(Names) -> + gen_server:cast(?MODULE, {delete_serdes, Names}). + +with_serde(WhichOp, Fun) -> + try + Fun() + catch + throw:Reason -> + ?SLOG(error, Reason#{ + which_op => WhichOp, + reason => emqx_utils:readable_error_msg(Reason) + }), + {error, Reason}; + error:Reason:Stacktrace -> + %% unexpected errors, log stacktrace + ?SLOG(warning, #{ + msg => "plugin_schema_op_failed", + which_op => WhichOp, + exception => Reason, + stacktrace => Stacktrace + }), + {error, #{ + which_op => WhichOp, + reason => Reason + }} + end. + +eval_serde_fun(Op, ErrMsg, SerdeName, Args) -> + fun() -> + case get_serde(SerdeName) of + {ok, Serde} -> + eval_serde(Op, Serde, Args); + {error, not_found} -> + throw(#{ + error_msg => ErrMsg, + reason => plugin_serde_not_found, + serde_name => SerdeName + }) + end + end. + +eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> + Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), + {ok, avro_binary_decoder:decode(Data, Name, Store, Opts)}; +eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> + {ok, avro_binary_encoder:encode(Store, Name, Data)}; +eval_serde(_, _, _) -> + throw(#{error_msg => "unexpected_plugin_avro_op"}). + +read_avsc_file(Path) -> + case file:read_file(Path) of + {ok, Bin} -> + {ok, Bin}; + {error, _} -> + {error, #{ + error => "failed_to_read_plugin_schema", + path => Path + }} + end. + +to_bin(A) when is_atom(A) -> atom_to_binary(A); +to_bin(L) when is_list(L) -> iolist_to_binary(L); +to_bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_plugins/src/emqx_plugins_sup.erl b/apps/emqx_plugins/src/emqx_plugins_sup.erl index f2092fb28..9f396c14d 100644 --- a/apps/emqx_plugins/src/emqx_plugins_sup.erl +++ b/apps/emqx_plugins/src/emqx_plugins_sup.erl @@ -32,4 +32,14 @@ init([]) -> intensity => 100, period => 10 }, - {ok, {SupFlags, []}}. + ChildSpecs = [child_spec(emqx_plugins_serde)], + {ok, {SupFlags, ChildSpecs}}. + +child_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker + }. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 0f66e20dc..d7bdfad13 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -346,7 +346,7 @@ t_enable_disable(Config) -> ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()), ?assertMatch( {error, #{ - reason := "bad_plugin_config_status", + error_msg := "bad_plugin_config_status", hint := "disable_the_plugin_first" }}, emqx_plugins:ensure_uninstalled(NameVsn) @@ -374,15 +374,15 @@ t_bad_tar_gz(Config) -> ok = file:write_file(FakeTarTz, "a\n"), ?assertMatch( {error, #{ - reason := "bad_plugin_package", - return := eof + error_msg := "bad_plugin_package", + reason := eof }}, emqx_plugins:ensure_installed("fake-vsn") ), ?assertMatch( {error, #{ - reason := "failed_to_extract_plugin_package", - return := not_found + error_msg := "failed_to_extract_plugin_package", + reason := not_found }}, emqx_plugins:ensure_installed("nonexisting") ), @@ -412,7 +412,7 @@ t_bad_tar_gz2(Config) -> ?assert(filelib:is_regular(TarGz)), %% failed to install, it also cleans up the bad content of .tar.gz file ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))), %% but the tar.gz file is still around ?assert(filelib:is_regular(TarGz)), ok. @@ -440,8 +440,8 @@ t_tar_vsn_content_mismatch(Config) -> %% failed to install, it also cleans up content of the bad .tar.gz file even %% if in other directory ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))), - ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir("foo-0.2"))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))), + ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir("foo-0.2"))), %% the tar.gz file is still around ?assert(filelib:is_regular(TarGz)), ok. @@ -455,15 +455,15 @@ t_bad_info_json(Config) -> ok = write_info_file(Config, NameVsn, "bad-syntax"), ?assertMatch( {error, #{ - error := "bad_info_file", - return := {parse_error, _} + error_msg := "bad_info_file", + reason := {parse_error, _} }}, emqx_plugins:describe(NameVsn) ), ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"), ?assertMatch( {error, #{ - error := "bad_info_file_content", + error_msg := "bad_info_file_content", mandatory_fields := _ }}, emqx_plugins:describe(NameVsn) @@ -499,7 +499,7 @@ t_elixir_plugin(Config) -> ok = emqx_plugins:ensure_installed(NameVsn), %% idempotent ok = emqx_plugins:ensure_installed(NameVsn), - {ok, Info} = emqx_plugins:read_plugin(NameVsn, #{}), + {ok, Info} = emqx_plugins:read_plugin_info(NameVsn, #{}), ?assertEqual([Info], emqx_plugins:list()), %% start ok = emqx_plugins:ensure_started(NameVsn), diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index d064cba9d..010d96de9 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -57,7 +57,7 @@ read_plugin_test() -> ok = write_file(InfoFile, FakeInfo), ?assertMatch( {error, #{error := "bad_rel_apps"}}, - emqx_plugins:read_plugin(NameVsn, #{}) + emqx_plugins:read_plugin_info(NameVsn, #{}) ) after emqx_plugins:purge(NameVsn) @@ -109,7 +109,7 @@ purge_test() -> with_rand_install_dir( fun(_Dir) -> File = emqx_plugins:info_file("a-1"), - Dir = emqx_plugins:dir("a-1"), + Dir = emqx_plugins:plugin_dir("a-1"), ok = filelib:ensure_dir(File), ?assertMatch({ok, _}, file:read_file_info(Dir)), ?assertEqual(ok, emqx_plugins:purge("a-1")),