refactor: plguin functions and types rename
This commit is contained in:
parent
33aa61daea
commit
e5f7aa9817
|
@ -508,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) ->
|
|||
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
|
||||
case emqx_plugins:describe(NameVsn) of
|
||||
{ok, _} ->
|
||||
case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of
|
||||
case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
|
||||
{ok, AvroValueConfig} ->
|
||||
Nodes = emqx:running_nodes(),
|
||||
%% cluster call with config in map (binary key-value)
|
||||
|
@ -702,8 +702,8 @@ aggregate_status([{Node, Plugins} | List], Acc) ->
|
|||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
format_plugin_avsc_and_i18n(NameVsn) ->
|
||||
#{
|
||||
avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end),
|
||||
i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end)
|
||||
avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end),
|
||||
i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end)
|
||||
}.
|
||||
|
||||
try_read_file(Fun) ->
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab).
|
||||
|
||||
-define(CONFIG_FORMAT_AVRO, config_format_avro).
|
||||
-define(CONFIG_FORMAT_BIN, config_format_bin).
|
||||
-define(CONFIG_FORMAT_MAP, config_format_map).
|
||||
|
||||
-define(plugin_conf_not_found, plugin_conf_not_found).
|
||||
|
@ -32,6 +32,16 @@
|
|||
-type encoded_data() :: iodata().
|
||||
-type decoded_data() :: map().
|
||||
|
||||
%% "my_plugin-0.1.0"
|
||||
-type name_vsn() :: binary() | string().
|
||||
%% the parse result of the JSON info file
|
||||
-type plugin_info() :: map().
|
||||
-type schema_json_map() :: map().
|
||||
-type i18n_json_map() :: map().
|
||||
-type raw_plugin_config_content() :: binary().
|
||||
-type plugin_config_map() :: map().
|
||||
-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
|
||||
|
||||
-record(plugin_schema_serde, {
|
||||
name :: schema_name(),
|
||||
eval_context :: term(),
|
||||
|
|
|
@ -28,9 +28,9 @@
|
|||
|
||||
-export([
|
||||
describe/1,
|
||||
plugin_avsc/1,
|
||||
plugin_i18n/1,
|
||||
plugin_avro/1,
|
||||
plugin_schema_json/1,
|
||||
plugin_i18n_json/1,
|
||||
raw_plugin_config_content/1,
|
||||
parse_name_vsn/1,
|
||||
make_name_vsn_string/2
|
||||
]).
|
||||
|
@ -69,7 +69,7 @@
|
|||
|
||||
%% Package utils
|
||||
-export([
|
||||
decode_plugin_avro_config/2,
|
||||
decode_plugin_config_map/2,
|
||||
install_dir/0,
|
||||
avsc_file_path/1,
|
||||
with_plugin_avsc/1
|
||||
|
@ -85,7 +85,7 @@
|
|||
|
||||
%% Internal export
|
||||
-export([
|
||||
ensure_avro_config/1,
|
||||
ensure_config_map/1,
|
||||
do_ensure_started/1
|
||||
]).
|
||||
%% for test cases
|
||||
|
@ -104,36 +104,26 @@
|
|||
|
||||
-define(MAX_KEEP_BACKUP_CONFIGS, 10).
|
||||
|
||||
%% "my_plugin-0.1.0"
|
||||
-type name_vsn() :: binary() | string().
|
||||
%% the parse result of the JSON info file
|
||||
-type plugin() :: map().
|
||||
-type schema_json() :: map().
|
||||
-type i18n_json() :: map().
|
||||
-type avro_binary() :: binary().
|
||||
-type plugin_config() :: map().
|
||||
-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Describe a plugin.
|
||||
-spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
|
||||
-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
|
||||
describe(NameVsn) ->
|
||||
read_plugin_info(NameVsn, #{fill_readme => true}).
|
||||
|
||||
-spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
|
||||
plugin_avsc(NameVsn) ->
|
||||
-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
|
||||
plugin_schema_json(NameVsn) ->
|
||||
read_plugin_avsc(NameVsn).
|
||||
|
||||
-spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
|
||||
plugin_i18n(NameVsn) ->
|
||||
-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
|
||||
plugin_i18n_json(NameVsn) ->
|
||||
read_plugin_i18n(NameVsn).
|
||||
|
||||
-spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
|
||||
plugin_avro(NameVsn) ->
|
||||
read_plugin_avro(NameVsn).
|
||||
-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
|
||||
raw_plugin_config_content(NameVsn) ->
|
||||
read_plugin_hocon(NameVsn).
|
||||
|
||||
parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
|
||||
parse_name_vsn(binary_to_list(NameVsn));
|
||||
|
@ -309,46 +299,44 @@ get_config(Name, Vsn, Opt, Default) ->
|
|||
get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
|
||||
|
||||
-spec get_config(name_vsn()) ->
|
||||
{ok, plugin_config() | any()}
|
||||
{ok, plugin_config_map() | any()}
|
||||
| {error, term()}.
|
||||
get_config(NameVsn) ->
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
|
||||
|
||||
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_AVRO) ->
|
||||
{ok, avro_binary() | plugin_config() | any()}
|
||||
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
|
||||
{ok, raw_plugin_config_content() | plugin_config_map() | any()}
|
||||
| {error, term()}.
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_AVRO) ->
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
|
||||
get_config_bin(NameVsn).
|
||||
|
||||
%% Present default config value only in map format.
|
||||
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
|
||||
{ok, plugin_config() | any()}
|
||||
{ok, plugin_config_map() | any()}
|
||||
| {error, term()}.
|
||||
get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
|
||||
{ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
|
||||
|
||||
get_config_bin(NameVsn) ->
|
||||
%% no default value when get raw binary config
|
||||
case read_plugin_avro(NameVsn) of
|
||||
{ok, _AvroJson} = Res -> Res;
|
||||
case read_plugin_hocon(NameVsn) of
|
||||
{ok, _Map} = Res -> Res;
|
||||
{error, _Reason} = Err -> Err
|
||||
end.
|
||||
|
||||
%% @doc Update plugin's config.
|
||||
%% RPC call from Management API or CLI.
|
||||
%% the avro Json Map and plugin config ALWAYS be valid before calling this function.
|
||||
put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
|
||||
put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig);
|
||||
put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
|
||||
HoconBin = hocon_pp:do(AvroJsonMap, #{}),
|
||||
ok = backup_and_write_avro_bin(NameVsn, HoconBin),
|
||||
%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function.
|
||||
put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
|
||||
put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig);
|
||||
put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) ->
|
||||
HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
|
||||
ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
|
||||
%% TODO: callback in plugin's on_config_changed (config update by mgmt API)
|
||||
%% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
|
||||
%% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
|
||||
%% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap),
|
||||
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
|
||||
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
|
||||
ok.
|
||||
|
||||
%% @doc Stop and then start the plugin.
|
||||
|
@ -360,7 +348,7 @@ restart(NameVsn) ->
|
|||
|
||||
%% @doc List all installed plugins.
|
||||
%% Including the ones that are installed, but not enabled in config.
|
||||
-spec list() -> [plugin()].
|
||||
-spec list() -> [plugin_info()].
|
||||
list() ->
|
||||
Pattern = filename:join([install_dir(), "*", "release.json"]),
|
||||
All = lists:filtermap(
|
||||
|
@ -381,10 +369,10 @@ list() ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Package utils
|
||||
|
||||
-spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
|
||||
decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
|
||||
decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
|
||||
decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
|
||||
-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
|
||||
decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
|
||||
decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
|
||||
decode_plugin_config_map(NameVsn, AvroJsonBin) ->
|
||||
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
|
||||
{ok, Config} -> {ok, Config};
|
||||
{error, ReasonMap} -> {error, ReasonMap}
|
||||
|
@ -712,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) ->
|
|||
read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
|
||||
).
|
||||
|
||||
read_plugin_avro(NameVsn) ->
|
||||
read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}).
|
||||
read_plugin_avro(NameVsn, Options) ->
|
||||
read_plugin_hocon(NameVsn) ->
|
||||
read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
|
||||
read_plugin_hocon(NameVsn, Options) ->
|
||||
tryit(
|
||||
atom_to_list(?FUNCTION_NAME),
|
||||
read_file_fun(plugin_config_file(NameVsn), "bad_avro_file", Options)
|
||||
read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
|
||||
).
|
||||
|
||||
ensure_exists_and_installed(NameVsn) ->
|
||||
|
@ -738,7 +726,7 @@ ensure_exists_and_installed(NameVsn) ->
|
|||
|
||||
do_get_from_cluster(NameVsn) ->
|
||||
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
||||
case get_from_any_node(Nodes, NameVsn, []) of
|
||||
case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
|
||||
{ok, TarContent} ->
|
||||
ok = file:write_file(pkg_file_path(NameVsn), TarContent),
|
||||
ok = do_ensure_installed(NameVsn);
|
||||
|
@ -761,19 +749,19 @@ do_get_from_cluster(NameVsn) ->
|
|||
{error, ErrMeta}
|
||||
end.
|
||||
|
||||
get_from_any_node([], _NameVsn, Errors) ->
|
||||
get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
|
||||
{error, Errors};
|
||||
get_from_any_node([Node | T], NameVsn, Errors) ->
|
||||
get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
|
||||
case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
|
||||
{ok, _} = Res ->
|
||||
Res;
|
||||
Err ->
|
||||
get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
||||
get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
||||
end.
|
||||
|
||||
get_avro_config_from_any_node([], _NameVsn, Errors) ->
|
||||
get_plugin_config_from_any_node([], _NameVsn, Errors) ->
|
||||
{error, Errors};
|
||||
get_avro_config_from_any_node([Node | T], NameVsn, Errors) ->
|
||||
get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
|
||||
case
|
||||
emqx_plugins_proto_v2:get_config(
|
||||
Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
|
||||
|
@ -782,7 +770,7 @@ get_avro_config_from_any_node([Node | T], NameVsn, Errors) ->
|
|||
{ok, _} = Res ->
|
||||
Res;
|
||||
Err ->
|
||||
get_avro_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
||||
get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
||||
end.
|
||||
|
||||
plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
|
||||
|
@ -1159,30 +1147,37 @@ do_create_config_dir(NameVsn) ->
|
|||
maybe_ensure_plugin_config(NameVsn) ->
|
||||
maybe
|
||||
true ?= with_plugin_avsc(NameVsn),
|
||||
_ = do_ensure_plugin_config(NameVsn)
|
||||
_ = ensure_plugin_config(NameVsn)
|
||||
else
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
do_ensure_plugin_config(NameVsn) ->
|
||||
%% fetch plugin avro config from cluster
|
||||
ensure_plugin_config(NameVsn) ->
|
||||
%% fetch plugin hocon config from cluster
|
||||
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
||||
case get_avro_config_from_any_node(Nodes, NameVsn, []) of
|
||||
{ok, AvroJsonMap} when is_map(AvroJsonMap) ->
|
||||
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
|
||||
ok = file:write_file(plugin_config_file(NameVsn), AvroJsonBin),
|
||||
ensure_avro_config(NameVsn);
|
||||
ensure_plugin_config(NameVsn, Nodes).
|
||||
ensure_plugin_config(NameVsn, []) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "default_plugin_config_used",
|
||||
name_vsn => NameVsn,
|
||||
reason => "no_other_running_nodes"
|
||||
}),
|
||||
cp_default_config_file(NameVsn);
|
||||
ensure_plugin_config(NameVsn, Nodes) ->
|
||||
case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
|
||||
{ok, ConfigMap} when is_map(ConfigMap) ->
|
||||
HoconBin = hocon_pp:do(ConfigMap, #{}),
|
||||
ok = file:write_file(plugin_config_file(NameVsn), HoconBin),
|
||||
ensure_config_map(NameVsn);
|
||||
_ ->
|
||||
?SLOG(info, #{msg => "config_not_found_from_cluster"}),
|
||||
%% otherwise cp default avro file
|
||||
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
|
||||
%% otherwise cp default hocon file
|
||||
%% i.e. Clean installation
|
||||
cp_default_config_file(NameVsn),
|
||||
ensure_avro_config(NameVsn)
|
||||
cp_default_config_file(NameVsn)
|
||||
end.
|
||||
|
||||
cp_default_config_file(NameVsn) ->
|
||||
%% always copy default avro file into config dir
|
||||
%% when can not get config from other nodes
|
||||
%% always copy default hocon file into config dir when can not get config from other nodes
|
||||
Source = default_plugin_config_file(NameVsn),
|
||||
Destination = plugin_config_file(NameVsn),
|
||||
maybe
|
||||
|
@ -1194,40 +1189,39 @@ cp_default_config_file(NameVsn) ->
|
|||
ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(warning, #{
|
||||
msg => "failed_to_copy_plugin_default_avro_config",
|
||||
msg => "failed_to_copy_plugin_default_hocon_config",
|
||||
source => Source,
|
||||
destination => Destination,
|
||||
reason => Reason
|
||||
})
|
||||
end
|
||||
else
|
||||
_ -> ensure_avro_config(NameVsn)
|
||||
_ -> ensure_config_map(NameVsn)
|
||||
end.
|
||||
|
||||
ensure_avro_config(NameVsn) ->
|
||||
ensure_config_map(NameVsn) ->
|
||||
with_plugin_avsc(NameVsn) andalso
|
||||
do_ensure_avro_config(NameVsn).
|
||||
do_ensure_config_map(NameVsn).
|
||||
|
||||
do_ensure_avro_config(NameVsn) ->
|
||||
case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of
|
||||
{ok, AvroJsonMap} ->
|
||||
{ok, Config} = decode_plugin_avro_config(
|
||||
NameVsn, emqx_utils_json:encode(AvroJsonMap)
|
||||
),
|
||||
put_config(NameVsn, AvroJsonMap, Config);
|
||||
do_ensure_config_map(NameVsn) ->
|
||||
case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
|
||||
{ok, ConfigJsonMap} ->
|
||||
{ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap),
|
||||
put_config(NameVsn, ConfigJsonMap, Config);
|
||||
_ ->
|
||||
?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
|
||||
ok
|
||||
end.
|
||||
|
||||
%% @private Backup the current config to a file with a timestamp suffix and
|
||||
%% then save the new config to the config file.
|
||||
backup_and_write_avro_bin(NameVsn, AvroBin) ->
|
||||
backup_and_write_hocon_bin(NameVsn, HoconBin) ->
|
||||
%% this may fail, but we don't care
|
||||
%% e.g. read-only file system
|
||||
Path = plugin_config_file(NameVsn),
|
||||
_ = filelib:ensure_dir(Path),
|
||||
TmpFile = Path ++ ".tmp",
|
||||
case file:write_file(TmpFile, AvroBin) of
|
||||
case file:write_file(TmpFile, HoconBin) of
|
||||
ok ->
|
||||
backup_and_replace(Path, TmpFile);
|
||||
{error, Reason} ->
|
||||
|
@ -1313,7 +1307,7 @@ plugin_dir(NameVsn) ->
|
|||
plugin_priv_dir(NameVsn) ->
|
||||
case read_plugin_info(NameVsn, #{fill_readme => false}) of
|
||||
{ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
|
||||
AppDir = <<Name/binary, "-", Vsn/binary>>,
|
||||
AppDir = make_name_vsn_string(Name, Vsn),
|
||||
wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
|
||||
_ ->
|
||||
wrap_list_path(filename:join([install_dir(), NameVsn, "priv"]))
|
||||
|
|
|
@ -27,8 +27,6 @@
|
|||
-include("emqx_plugins.hrl").
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
-type name_vsn() :: binary() | string().
|
||||
|
||||
introduced_in() ->
|
||||
"5.7.0".
|
||||
|
||||
|
|
Loading…
Reference in New Issue