Merge pull request #12910 from JimMoen/EMQX-12187/feat-plugin-config-ui

feat(plugin): avro config schema store and encode/decode
This commit is contained in:
JimMoen 2024-04-27 00:51:58 +08:00 committed by GitHub
commit a3320ab51b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1114 additions and 349 deletions

View File

@ -46,6 +46,7 @@
{emqx_metrics,2}.
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_api_plugins,3}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}.
{emqx_mgmt_cluster,3}.

View File

@ -72,7 +72,7 @@ dashboard_addr(desc) -> ?DESC(dashboard_addr);
dashboard_addr(default) -> <<"https://127.0.0.1:18083">>;
dashboard_addr(_) -> undefined.
%% TOOD: support raw xml metadata in hocon (maybe?🤔)
%% TODO: support raw xml metadata in hocon (maybe?🤔)
idp_metadata_url(type) -> binary();
idp_metadata_url(desc) -> ?DESC(idp_metadata_url);
idp_metadata_url(default) -> <<"https://idp.example.com">>;

View File

@ -19,7 +19,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
%%-include_lib("emqx_plugins/include/emqx_plugins.hrl").
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
-export([
api_spec/0,
@ -34,6 +34,8 @@
upload_install/2,
plugin/2,
update_plugin/2,
plugin_config/2,
plugin_schema/2,
update_boot_order/2
]).
@ -43,7 +45,8 @@
install_package/2,
delete_package/1,
describe_package/1,
ensure_action/2
ensure_action/2,
do_update_plugin_config/3
]).
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$").
@ -52,7 +55,11 @@
%% app_name must be a snake_case (no '-' allowed).
-define(VSN_WILDCARD, "-*.tar.gz").
namespace() -> "plugins".
-define(CONTENT_PLUGIN, plugin).
-define(CONTENT_CONFIG, config).
namespace() ->
"plugins".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -64,6 +71,8 @@ paths() ->
"/plugins/:name",
"/plugins/install",
"/plugins/:name/:action",
"/plugins/:name/config",
"/plugins/:name/schema",
"/plugins/:name/move"
].
@ -97,15 +106,15 @@ schema("/plugins/install") ->
schema => #{
type => object,
properties => #{
plugin => #{type => string, format => binary}
?CONTENT_PLUGIN => #{type => string, format => binary}
}
},
encoding => #{plugin => #{'contentType' => 'application/gzip'}}
encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}}
}
}
},
responses => #{
200 => <<"OK">>,
204 => <<"Install plugin successfully">>,
400 => emqx_dashboard_swagger:error_codes(
['UNEXPECTED_ERROR', 'ALREADY_INSTALLED', 'BAD_PLUGIN_INFO']
)
@ -117,7 +126,7 @@ schema("/plugins/:name") ->
'operationId' => plugin,
get => #{
summary => <<"Get a plugin description">>,
description => "Describs plugin according to its `release.json` and `README.md`.",
description => "Describe a plugin according to its `release.json` and `README.md`.",
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
@ -152,22 +161,80 @@ schema("/plugins/:name/:action") ->
{action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}
],
responses => #{
200 => <<"OK">>,
204 => <<"Trigger action successfully">>,
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
}
}
};
schema("/plugins/:name/config") ->
#{
'operationId' => plugin_config,
get => #{
summary => <<"Get plugin config">>,
description =>
"Get plugin config. Config schema is defined by user's schema.avsc file.<br/>",
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
%% avro data, json encoded
200 => hoconsc:mk(binary()),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
}
},
put => #{
summary =>
<<"Update plugin config">>,
description =>
"Update plugin config. Config schema defined by user's schema.avsc file.<br/>",
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
'requestBody' => #{
content => #{
'application/json' => #{
schema => #{
type => object
}
}
}
},
responses => #{
204 => <<"Config updated successfully">>,
400 => emqx_dashboard_swagger:error_codes(
['BAD_CONFIG', 'UNEXPECTED_ERROR'], <<"Update plugin config failed">>
),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
}
}
};
schema("/plugins/:name/schema") ->
#{
'operationId' => plugin_schema,
get => #{
summary => <<"Get installed plugin's AVRO schema">>,
description => "Get plugin's config AVRO schema.",
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
%% avro schema and i18n json object
200 => hoconsc:mk(binary()),
404 => emqx_dashboard_swagger:error_codes(
['NOT_FOUND', 'FILE_NOT_EXISTED'],
<<"Plugin Not Found or Plugin not given a schema file">>
)
}
}
};
schema("/plugins/:name/move") ->
#{
'operationId' => update_boot_order,
post => #{
summary => <<"Move plugin within plugin hiearchy">>,
summary => <<"Move plugin within plugin hierarchy">>,
description => "Setting the boot order of plugins.",
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
'requestBody' => move_request_body(),
responses => #{
200 => <<"OK">>,
204 => <<"Boot order changed successfully">>,
400 => emqx_dashboard_swagger:error_codes(['MOVE_FAILED'], <<"Move failed">>)
}
}
@ -338,7 +405,7 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
NameVsn = string:trim(FileName, trailing, ".tar.gz"),
case emqx_plugins:describe(NameVsn) of
{error, #{error := "bad_info_file", return := {enoent, _}}} ->
{error, #{error_msg := "bad_info_file", reason := {enoent, _}}} ->
case emqx_plugins:parse_name_vsn(FileName) of
{ok, AppName, _Vsn} ->
AppDir = filename:join(emqx_plugins:install_dir(), AppName),
@ -382,7 +449,7 @@ do_install_package(FileName, Bin) ->
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of
[] ->
{200};
{204};
Filtered ->
%% crash if we have unexpected errors or results
[] = lists:filter(
@ -394,7 +461,7 @@ do_install_package(FileName, Bin) ->
),
Reason =
case hd(Filtered) of
{error, #{error := Reason0}} -> Reason0;
{error, #{error_msg := Reason0}} -> Reason0;
{error, #{reason := Reason0}} -> Reason0
end,
{400, #{
@ -418,6 +485,50 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
return(204, Res).
plugin_config(get, #{bindings := #{name := NameVsn}}) ->
case emqx_plugins:describe(NameVsn) of
{ok, _} ->
case emqx_plugins:get_config(NameVsn) of
{ok, AvroJson} ->
{200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson};
{error, _} ->
{400, #{
code => 'BAD_CONFIG',
message => <<"Failed to get plugin config">>
}}
end;
_ ->
{404, plugin_not_found_msg()}
end;
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
case emqx_plugins:describe(NameVsn) of
{ok, _} ->
case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of
{ok, AvroValueConfig} ->
Nodes = emqx:running_nodes(),
%% cluster call with config in map (binary key-value)
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
Nodes, NameVsn, AvroJsonMap, AvroValueConfig
),
{204};
{error, Reason} ->
{400, #{
code => 'BAD_CONFIG',
message => readable_error_msg(Reason)
}}
end;
_ ->
{404, plugin_not_found_msg()}
end.
plugin_schema(get, #{bindings := #{name := NameVsn}}) ->
case emqx_plugins:describe(NameVsn) of
{ok, _Plugin} ->
{200, format_plugin_avsc_and_i18n(NameVsn)};
_ ->
{404, plugin_not_found_msg()}
end.
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
case parse_position(Body, Name) of
{error, Reason} ->
@ -425,11 +536,11 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
Position ->
case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of
ok ->
{200};
{204};
{error, Reason} ->
{400, #{
code => 'MOVE_FAILED',
message => iolist_to_binary(io_lib:format("~p", [Reason]))
message => readable_error_msg(Reason)
}}
end
end.
@ -443,7 +554,7 @@ install_package(FileName, Bin) ->
ok = file:write_file(File, Bin),
PackageName = string:trim(FileName, trailing, ".tar.gz"),
case emqx_plugins:ensure_installed(PackageName) of
{error, #{return := not_found}} = NotFound ->
{error, #{reason := not_found}} = NotFound ->
NotFound;
{error, Reason} = Error ->
?SLOG(error, Reason#{msg => "failed_to_install_plugin"}),
@ -454,9 +565,9 @@ install_package(FileName, Bin) ->
end.
%% For RPC plugin get
describe_package(Name) ->
describe_package(NameVsn) ->
Node = node(),
case emqx_plugins:describe(Name) of
case emqx_plugins:describe(NameVsn) of
{ok, Plugin} -> {Node, [Plugin]};
_ -> {Node, []}
end.
@ -487,12 +598,32 @@ ensure_action(Name, restart) ->
_ = emqx_plugins:restart(Name),
ok.
%% for RPC plugin avro encoded config update
do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) ->
%% TODO: maybe use `PluginConfigMap` to validate config
emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
return(Code, ok) ->
{Code};
return(_, {error, #{error := "bad_info_file", return := {enoent, _} = Reason}}) ->
{404, #{code => 'NOT_FOUND', message => iolist_to_binary(io_lib:format("~p", [Reason]))}};
return(_, {error, #{error_msg := "bad_info_file", reason := {enoent, _} = Reason}}) ->
{404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = Reason}}) ->
{404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
return(_, {error, Reason}) ->
{400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}.
{400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}.
plugin_not_found_msg() ->
#{
code => 'NOT_FOUND',
message => <<"Plugin Not Found">>
}.
readable_error_msg(Msg) ->
emqx_utils:readable_error_msg(Msg).
parse_position(#{<<"position">> := <<"front">>}, _) ->
front;
@ -563,6 +694,18 @@ aggregate_status([{Node, Plugins} | List], Acc) ->
),
aggregate_status(List, NewAcc).
format_plugin_avsc_and_i18n(NameVsn) ->
#{
avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end),
i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end)
}.
try_read_file(Fun) ->
case Fun() of
{ok, Json} -> Json;
_ -> null
end.
% running_status: running loaded, stopped
%% config_status: not_configured disable enable
plugin_status(#{running_status := running}) -> running;

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_plugins_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_plugins/1,
install_package/3,
describe_package/2,
delete_package/1,
ensure_action/2,
update_plugin_config/4
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.7.0".
-spec get_plugins([node()]) -> emqx_rpc:multicall_result().
get_plugins(Nodes) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000).
-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result().
install_package(Nodes, Filename, Bin) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).
-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result().
describe_package(Nodes, Name) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000).
-spec delete_package(binary() | string()) -> ok | {error, any()}.
delete_package(Name) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
ensure_action(Name, Action) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).
-spec update_plugin_config(
[node()],
binary() | string(),
binary(),
map()
) ->
emqx_rpc:multicall_result().
update_plugin_config(Nodes, NameVsn, AvroJsonMap, PluginConfig) ->
rpc:multicall(
Nodes,
emqx_mgmt_api_plugins,
do_update_plugin_config,
[NameVsn, AvroJsonMap, PluginConfig],
10000
).

View File

@ -37,10 +37,10 @@ init_per_suite(Config) ->
ok = filelib:ensure_dir(WorkDir),
DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"),
DemoShDir = lists:flatten(string:replace(DemoShDir1, "emqx_management", "emqx_plugins")),
OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
OrigInstallDir = emqx_plugins:get_config_interal(install_dir, undefined),
ok = filelib:ensure_dir(DemoShDir),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]),
emqx_plugins:put_config(install_dir, DemoShDir),
emqx_plugins:put_config_internal(install_dir, DemoShDir),
[{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config].
end_per_suite(Config) ->
@ -48,7 +48,7 @@ end_per_suite(Config) ->
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
OrigInstallDir -> emqx_plugins:put_config_internal(install_dir, OrigInstallDir)
end,
emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
ok.
@ -271,7 +271,7 @@ install_plugin(FilePath) ->
Token
)
of
{ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok;
{ok, {{"HTTP/1.1", 204, "No Content"}, _Headers, <<>>}} -> ok;
Error -> Error
end.
@ -288,7 +288,7 @@ install_plugin(Config, FilePath) ->
Auth
)
of
{ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok;
{ok, {{"HTTP/1.1", 204, "No Content"}, _Headers, <<>>}} -> ok;
Error -> Error
end.

View File

@ -19,4 +19,25 @@
-define(CONF_ROOT, plugins).
-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab).
-define(CONFIG_FORMAT_AVRO, config_format_avro).
-define(CONFIG_FORMAT_MAP, config_format_map).
-type schema_name() :: binary().
-type avsc() :: binary().
-type encoded_data() :: iodata().
-type decoded_data() :: map().
-record(plugin_schema_serde, {
name :: schema_name(),
eval_context :: term(),
%% TODO: fields to mark schema import status
%% scheam_imported :: boolean(),
%% for future use
extra = []
}).
-type plugin_schema_serde() :: #plugin_schema_serde{}.
-endif.

View File

@ -1,5 +1,8 @@
%% -*- mode: erlang -*-
{deps, [{emqx, {path, "../emqx"}}]}.
{deps, [
{emqx, {path, "../emqx"}},
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}}
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,9 +1,9 @@
%% -*- mode: erlang -*-
{application, emqx_plugins, [
{description, "EMQX Plugin Management"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{modules, []},
{mod, {emqx_plugins_app, []}},
{applications, [kernel, stdlib, emqx]},
{applications, [kernel, stdlib, emqx, erlavro]},
{env, []}
]}.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,279 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugins_serde).
-include("emqx_plugins.hrl").
-include_lib("emqx/include/logger.hrl").
%% API
-export([
start_link/0,
lookup_serde/1,
add_schema/2,
get_schema/1,
delete_schema/1
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_continue/2,
terminate/2
]).
-export([
decode/2,
encode/2
]).
%%-------------------------------------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec lookup_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}.
lookup_serde(SchemaName) ->
case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of
[] ->
{error, not_found};
[Serde] ->
{ok, Serde}
end.
-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}.
add_schema(NameVsn, Path) ->
case lookup_serde(NameVsn) of
{ok, _Serde} ->
?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}),
{error, already_exists};
{error, not_found} ->
case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of
ok ->
?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}),
ok;
{error, Reason} = E ->
?SLOG(error, #{
msg => "plugin_schema_add_failed",
plugin => NameVsn,
reason => emqx_utils:readable_error_msg(Reason)
}),
E
end
end.
get_schema(NameVsn) ->
Path = emqx_plugins:avsc_file_path(NameVsn),
case read_avsc_file(Path) of
{ok, Avsc} ->
{ok, Avsc};
{error, Reason} ->
?SLOG(warning, Reason),
{error, Reason}
end.
-spec delete_schema(schema_name()) -> ok | {error, term()}.
delete_schema(NameVsn) ->
case lookup_serde(NameVsn) of
{ok, _Serde} ->
async_delete_serdes([NameVsn]),
ok;
{error, not_found} ->
{error, not_found}
end.
-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
decode(SerdeName, RawData) ->
with_serde(
?FUNCTION_NAME,
SerdeName,
[RawData]
).
-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
encode(SerdeName, Data) ->
with_serde(
?FUNCTION_NAME,
SerdeName,
[Data]
).
%%-------------------------------------------------------------------------------------------------
%% `gen_server' API
%%-------------------------------------------------------------------------------------------------
init(_) ->
process_flag(trap_exit, true),
ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [
public, ordered_set, {keypos, #plugin_schema_serde.name}
]),
State = #{},
AvscPaths = get_plugin_avscs(),
{ok, State, {continue, {build_serdes, AvscPaths}}}.
handle_continue({build_serdes, AvscPaths}, State) ->
_ = build_serdes(AvscPaths),
{noreply, State}.
handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
BuildRes = do_build_serde({NameVsn, AvscPath}),
{reply, BuildRes, State};
handle_call(_Call, _From, State) ->
{reply, {error, unknown_call}, State}.
handle_cast({delete_serdes, Names}, State) ->
lists:foreach(fun ensure_serde_absent/1, Names),
{noreply, State};
handle_cast(_Cast, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
-spec get_plugin_avscs() -> [{string(), string()}].
get_plugin_avscs() ->
Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]),
lists:foldl(
fun(AvscPath, AccIn) ->
[_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
[{NameVsn, AvscPath} | AccIn]
end,
_Acc0 = [],
filelib:wildcard(Pattern)
).
build_serdes(AvscPaths) ->
ok = lists:foreach(fun do_build_serde/1, AvscPaths).
do_build_serde({NameVsn, AvscPath}) ->
try
Serde = make_serde(NameVsn, AvscPath),
true = ets:insert(?PLUGIN_SERDE_TAB, Serde),
ok
catch
Kind:Error:Stacktrace ->
?SLOG(
error,
#{
msg => "error_building_plugin_schema_serde",
name => NameVsn,
kind => Kind,
error => Error,
stacktrace => Stacktrace
}
),
{error, Error}
end.
make_serde(NameVsn, AvscPath) ->
{ok, AvscBin} = read_avsc_file(AvscPath),
Store0 = avro_schema_store:new([map]),
%% import the schema into the map store with an assigned name
%% if it's a named schema (e.g. struct), then Name is added as alias
Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0),
#plugin_schema_serde{
name = NameVsn,
eval_context = Store
}.
ensure_serde_absent(Name) when not is_binary(Name) ->
ensure_serde_absent(to_bin(Name));
ensure_serde_absent(Name) ->
case lookup_serde(Name) of
{ok, _Serde} ->
_ = ets:delete(?PLUGIN_SERDE_TAB, Name),
ok;
{error, not_found} ->
ok
end.
async_delete_serdes(Names) ->
gen_server:cast(?MODULE, {delete_serdes, Names}).
with_serde(Op, SerdeName, Args) ->
WhichOp = which_op(Op),
ErrMsg = error_msg(Op),
try
eval_serde(Op, ErrMsg, SerdeName, Args)
catch
throw:Reason ->
?SLOG(error, Reason#{
which_op => WhichOp,
reason => emqx_utils:readable_error_msg(Reason)
}),
{error, Reason};
error:Reason:Stacktrace ->
%% unexpected errors, log stacktrace
?SLOG(warning, #{
msg => "plugin_schema_op_failed",
which_op => WhichOp,
exception => Reason,
stacktrace => Stacktrace
}),
{error, #{
which_op => WhichOp,
reason => Reason
}}
end.
eval_serde(Op, ErrMsg, SerdeName, Args) ->
case lookup_serde(SerdeName) of
{ok, Serde} ->
eval_serde(Op, Serde, Args);
{error, not_found} ->
throw(#{
error_msg => ErrMsg,
reason => plugin_serde_not_found,
serde_name => SerdeName
})
end.
eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}, {encoding, avro_json}]),
{ok, avro_json_decoder:decode_value(Data, Name, Store, Opts)};
eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
{ok, avro_json_encoder:encode(Store, Name, Data)};
eval_serde(_, _, _) ->
throw(#{error_msg => "unexpected_plugin_avro_op"}).
which_op(Op) ->
atom_to_list(Op) ++ "_avro_json".
error_msg(Op) ->
atom_to_list(Op) ++ "_avro_data".
read_avsc_file(Path) ->
case file:read_file(Path) of
{ok, Bin} ->
{ok, Bin};
{error, _} ->
{error, #{
error => "failed_to_read_plugin_schema",
path => Path
}}
end.
to_bin(A) when is_atom(A) -> atom_to_binary(A);
to_bin(L) when is_list(L) -> iolist_to_binary(L);
to_bin(B) when is_binary(B) -> B.

View File

@ -32,4 +32,14 @@ init([]) ->
intensity => 100,
period => 10
},
{ok, {SupFlags, []}}.
ChildSpecs = [child_spec(emqx_plugins_serde)],
{ok, {SupFlags, ChildSpecs}}.
child_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker
}.

View File

@ -346,7 +346,7 @@ t_enable_disable(Config) ->
?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
?assertMatch(
{error, #{
reason := "bad_plugin_config_status",
error_msg := "bad_plugin_config_status",
hint := "disable_the_plugin_first"
}},
emqx_plugins:ensure_uninstalled(NameVsn)
@ -374,15 +374,15 @@ t_bad_tar_gz(Config) ->
ok = file:write_file(FakeTarTz, "a\n"),
?assertMatch(
{error, #{
reason := "bad_plugin_package",
return := eof
error_msg := "bad_plugin_package",
reason := eof
}},
emqx_plugins:ensure_installed("fake-vsn")
),
?assertMatch(
{error, #{
reason := "failed_to_extract_plugin_package",
return := not_found
error_msg := "failed_to_extract_plugin_package",
reason := not_found
}},
emqx_plugins:ensure_installed("nonexisting")
),
@ -412,7 +412,7 @@ t_bad_tar_gz2(Config) ->
?assert(filelib:is_regular(TarGz)),
%% failed to install, it also cleans up the bad content of .tar.gz file
?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
%% but the tar.gz file is still around
?assert(filelib:is_regular(TarGz)),
ok.
@ -440,8 +440,8 @@ t_tar_vsn_content_mismatch(Config) ->
%% failed to install, it also cleans up content of the bad .tar.gz file even
%% if in other directory
?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir("foo-0.2"))),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir("foo-0.2"))),
%% the tar.gz file is still around
?assert(filelib:is_regular(TarGz)),
ok.
@ -455,15 +455,15 @@ t_bad_info_json(Config) ->
ok = write_info_file(Config, NameVsn, "bad-syntax"),
?assertMatch(
{error, #{
error := "bad_info_file",
return := {parse_error, _}
error_msg := "bad_info_file",
reason := {parse_error, _}
}},
emqx_plugins:describe(NameVsn)
),
ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"),
?assertMatch(
{error, #{
error := "bad_info_file_content",
error_msg := "bad_info_file_content",
mandatory_fields := _
}},
emqx_plugins:describe(NameVsn)
@ -499,7 +499,7 @@ t_elixir_plugin(Config) ->
ok = emqx_plugins:ensure_installed(NameVsn),
%% idempotent
ok = emqx_plugins:ensure_installed(NameVsn),
{ok, Info} = emqx_plugins:read_plugin(NameVsn, #{}),
{ok, Info} = emqx_plugins:read_plugin_info(NameVsn, #{}),
?assertEqual([Info], emqx_plugins:list()),
%% start
ok = emqx_plugins:ensure_started(NameVsn),
@ -626,9 +626,9 @@ group_t_copy_plugin_to_a_new_node({init, Config}) ->
}
),
[CopyFromNode] = emqx_cth_cluster:start([SpecCopyFrom#{join_to => undefined}]),
ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]),
ok = rpc:call(CopyFromNode, emqx_plugins, put_config_internal, [install_dir, FromInstallDir]),
[CopyToNode] = emqx_cth_cluster:start([SpecCopyTo#{join_to => undefined}]),
ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]),
ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [install_dir, ToInstallDir]),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]),
ok = rpc:call(CopyFromNode, emqx_plugins, ensure_started, [NameVsn]),
@ -658,7 +658,7 @@ group_t_copy_plugin_to_a_new_node(Config) ->
CopyFromNode = proplists:get_value(copy_from_node, Config),
CopyToNode = proplists:get_value(copy_to_node, Config),
CopyToDir = proplists:get_value(to_install_dir, Config),
CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config, [[states], []]),
CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config_interal, [[states], []]),
NameVsn = proplists:get_value(name_vsn, Config),
PluginName = proplists:get_value(plugin_name, Config),
PluginApp = list_to_atom(PluginName),
@ -681,7 +681,7 @@ group_t_copy_plugin_to_a_new_node(Config) ->
),
ok = rpc:call(CopyToNode, ekka, join, [CopyFromNode]),
%% Mimic cluster-override conf copying
ok = rpc:call(CopyToNode, emqx_plugins, put_config, [[states], CopyFromPluginsState]),
ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]),
%% Plugin copying is triggered upon app restart on a new node.
%% This is similar to emqx_conf, which copies cluster-override conf upon start,
%% see: emqx_conf_app:init_conf/0
@ -734,7 +734,7 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
%% successfully even if it's not extracted yet. Simply starting
%% the node would crash if not working properly.
ct:pal("~p config:\n ~p", [
CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config, [[], #{}])
CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config_interal, [[], #{}])
]),
ct:pal("~p install_dir:\n ~p", [
CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir])

View File

@ -16,6 +16,7 @@
-module(emqx_plugins_tests).
-include("emqx_plugins.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(nowarn_export_all).
@ -28,20 +29,20 @@ ensure_configured_test_todo() ->
after
emqx_plugins:put_configured([])
end,
meck:unload(emqx).
unmeck_emqx().
test_ensure_configured() ->
ok = emqx_plugins:put_configured([]),
P1 = #{name_vsn => "p-1", enable => true},
P2 = #{name_vsn => "p-2", enable => true},
P3 = #{name_vsn => "p-3", enable => false},
emqx_plugins:ensure_configured(P1, front),
emqx_plugins:ensure_configured(P2, {before, <<"p-1">>}),
emqx_plugins:ensure_configured(P3, {before, <<"p-1">>}),
emqx_plugins:ensure_configured(P1, front, local),
emqx_plugins:ensure_configured(P2, {before, <<"p-1">>}, local),
emqx_plugins:ensure_configured(P3, {before, <<"p-1">>}, local),
?assertEqual([P2, P3, P1], emqx_plugins:configured()),
?assertThrow(
#{error := "position_anchor_plugin_not_configured"},
emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>})
emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>}, local)
).
read_plugin_test() ->
@ -49,34 +50,34 @@ read_plugin_test() ->
with_rand_install_dir(
fun(_Dir) ->
NameVsn = "bar-5",
InfoFile = emqx_plugins:info_file(NameVsn),
InfoFile = emqx_plugins:info_file_path(NameVsn),
FakeInfo =
"name=bar, rel_vsn=\"5\", rel_apps=[justname_no_vsn],"
"description=\"desc bar\"",
try
ok = write_file(InfoFile, FakeInfo),
?assertMatch(
{error, #{error := "bad_rel_apps"}},
emqx_plugins:read_plugin(NameVsn, #{})
{error, #{error_msg := "bad_rel_apps"}},
emqx_plugins:read_plugin_info(NameVsn, #{})
)
after
emqx_plugins:purge(NameVsn)
end
end
),
meck:unload(emqx).
unmeck_emqx().
with_rand_install_dir(F) ->
N = rand:uniform(10000000),
TmpDir = integer_to_list(N),
OriginalInstallDir = emqx_plugins:install_dir(),
ok = filelib:ensure_dir(filename:join([TmpDir, "foo"])),
ok = emqx_plugins:put_config(install_dir, TmpDir),
ok = emqx_plugins:put_config_internal(install_dir, TmpDir),
try
F(TmpDir)
after
file:del_dir_r(TmpDir),
ok = emqx_plugins:put_config(install_dir, OriginalInstallDir)
ok = emqx_plugins:put_config_internal(install_dir, OriginalInstallDir)
end.
write_file(Path, Content) ->
@ -90,7 +91,7 @@ delete_package_test() ->
meck_emqx(),
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:pkg_file("a-1"),
File = emqx_plugins:pkg_file_path("a-1"),
ok = write_file(File, "a"),
ok = emqx_plugins:delete_package("a-1"),
%% delete again should be ok
@ -100,7 +101,7 @@ delete_package_test() ->
?assertMatch({error, _}, emqx_plugins:delete_package("a-1"))
end
),
meck:unload(emqx).
unmeck_emqx().
%% purge plugin's install dir should mostly work and return ok
%% but it may fail in case the dir is read-only
@ -108,8 +109,8 @@ purge_test() ->
meck_emqx(),
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:info_file("a-1"),
Dir = emqx_plugins:dir("a-1"),
File = emqx_plugins:info_file_path("a-1"),
Dir = emqx_plugins:plugin_dir("a-1"),
ok = filelib:ensure_dir(File),
?assertMatch({ok, _}, file:read_file_info(Dir)),
?assertEqual(ok, emqx_plugins:purge("a-1")),
@ -120,10 +121,11 @@ purge_test() ->
?assertEqual(ok, emqx_plugins:purge("a-1"))
end
),
meck:unload(emqx).
unmeck_emqx().
meck_emqx() ->
meck:new(emqx, [unstick, passthrough]),
meck:new(emqx_plugins_serde),
meck:expect(
emqx,
update_config,
@ -131,4 +133,14 @@ meck_emqx() ->
emqx_config:put(Path, Values)
end
),
meck:expect(
emqx_plugins_serde,
delete_schema,
fun(_NameVsn) -> ok end
),
ok.
unmeck_emqx() ->
meck:unload(emqx),
meck:unload(emqx_plugins_serde),
ok.

15
changes/feat-12910.en.md Normal file
View File

@ -0,0 +1,15 @@
Provided a configuration API endpoint for plugin functionality.
This allows users to describe the configuration struct of their plugins using AVRO schema.
During plugin runtime, the plugin's configuration can be accessed via the API.
Added new API endpoints:
- `/plugins/:name/schema`
To get plugins avro schema and i18n config in one json object.
- `/plugins/:name/config`
To get or update plugin's own config
Changed API endpoints:
- `/plugins/install`
Status code when succeeded change to `204`. It was `200` previously.
- `/plugins/:name/move`
Status code when succeeded change to `204`. It was `200` previously.