From e5f7aa981738977e1b28d7a0dcf6863ae596b6a4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 22 May 2024 11:08:55 +0800 Subject: [PATCH] refactor: plguin functions and types rename --- .../src/emqx_mgmt_api_plugins.erl | 6 +- apps/emqx_plugins/include/emqx_plugins.hrl | 12 +- apps/emqx_plugins/src/emqx_plugins.erl | 160 +++++++++--------- .../src/proto/emqx_plugins_proto_v2.erl | 2 - 4 files changed, 91 insertions(+), 89 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index ca0d3ac10..ed5f016b3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -508,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) -> plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of + case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of {ok, AvroValueConfig} -> Nodes = emqx:running_nodes(), %% cluster call with config in map (binary key-value) @@ -702,8 +702,8 @@ aggregate_status([{Node, Plugins} | List], Acc) -> -if(?EMQX_RELEASE_EDITION == ee). format_plugin_avsc_and_i18n(NameVsn) -> #{ - avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), - i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end) }. try_read_file(Fun) -> diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index d1411884f..66f23d4b0 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -21,7 +21,7 @@ -define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab). --define(CONFIG_FORMAT_AVRO, config_format_avro). +-define(CONFIG_FORMAT_BIN, config_format_bin). -define(CONFIG_FORMAT_MAP, config_format_map). -define(plugin_conf_not_found, plugin_conf_not_found). @@ -32,6 +32,16 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). +%% "my_plugin-0.1.0" +-type name_vsn() :: binary() | string(). +%% the parse result of the JSON info file +-type plugin_info() :: map(). +-type schema_json_map() :: map(). +-type i18n_json_map() :: map(). +-type raw_plugin_config_content() :: binary(). +-type plugin_config_map() :: map(). +-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. + -record(plugin_schema_serde, { name :: schema_name(), eval_context :: term(), diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 1a8a679c6..ae97e7f5f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -28,9 +28,9 @@ -export([ describe/1, - plugin_avsc/1, - plugin_i18n/1, - plugin_avro/1, + plugin_schema_json/1, + plugin_i18n_json/1, + raw_plugin_config_content/1, parse_name_vsn/1, make_name_vsn_string/2 ]). @@ -69,7 +69,7 @@ %% Package utils -export([ - decode_plugin_avro_config/2, + decode_plugin_config_map/2, install_dir/0, avsc_file_path/1, with_plugin_avsc/1 @@ -85,7 +85,7 @@ %% Internal export -export([ - ensure_avro_config/1, + ensure_config_map/1, do_ensure_started/1 ]). %% for test cases @@ -104,36 +104,26 @@ -define(MAX_KEEP_BACKUP_CONFIGS, 10). -%% "my_plugin-0.1.0" --type name_vsn() :: binary() | string(). -%% the parse result of the JSON info file --type plugin() :: map(). --type schema_json() :: map(). --type i18n_json() :: map(). --type avro_binary() :: binary(). --type plugin_config() :: map(). --type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Describe a plugin. --spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. +-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}. describe(NameVsn) -> read_plugin_info(NameVsn, #{fill_readme => true}). --spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}. -plugin_avsc(NameVsn) -> +-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}. +plugin_schema_json(NameVsn) -> read_plugin_avsc(NameVsn). --spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}. -plugin_i18n(NameVsn) -> +-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}. +plugin_i18n_json(NameVsn) -> read_plugin_i18n(NameVsn). --spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}. -plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn). +-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}. +raw_plugin_config_content(NameVsn) -> + read_plugin_hocon(NameVsn). parse_name_vsn(NameVsn) when is_binary(NameVsn) -> parse_name_vsn(binary_to_list(NameVsn)); @@ -309,46 +299,44 @@ get_config(Name, Vsn, Opt, Default) -> get_config(make_name_vsn_string(Name, Vsn), Opt, Default). -spec get_config(name_vsn()) -> - {ok, plugin_config() | any()} + {ok, plugin_config_map() | any()} | {error, term()}. get_config(NameVsn) -> get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}). --spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_AVRO) -> - {ok, avro_binary() | plugin_config() | any()} +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) -> + {ok, raw_plugin_config_content() | plugin_config_map() | any()} | {error, term()}. get_config(NameVsn, ?CONFIG_FORMAT_MAP) -> get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}); -get_config(NameVsn, ?CONFIG_FORMAT_AVRO) -> +get_config(NameVsn, ?CONFIG_FORMAT_BIN) -> get_config_bin(NameVsn). %% Present default config value only in map format. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) -> - {ok, plugin_config() | any()} + {ok, plugin_config_map() | any()} | {error, term()}. get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) -> {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. get_config_bin(NameVsn) -> %% no default value when get raw binary config - case read_plugin_avro(NameVsn) of - {ok, _AvroJson} = Res -> Res; + case read_plugin_hocon(NameVsn) of + {ok, _Map} = Res -> Res; {error, _Reason} = Err -> Err end. %% @doc Update plugin's config. %% RPC call from Management API or CLI. -%% the avro Json Map and plugin config ALWAYS be valid before calling this function. -put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> - put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig); -put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> - HoconBin = hocon_pp:do(AvroJsonMap, #{}), - ok = backup_and_write_avro_bin(NameVsn, HoconBin), +%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> + put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig); +put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) -> + HoconBin = hocon_pp:do(ConfigJsonMap, #{}), + ok = backup_and_write_hocon_bin(NameVsn, HoconBin), %% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) - %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), - %% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap), - ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap), ok. %% @doc Stop and then start the plugin. @@ -360,7 +348,7 @@ restart(NameVsn) -> %% @doc List all installed plugins. %% Including the ones that are installed, but not enabled in config. --spec list() -> [plugin()]. +-spec list() -> [plugin_info()]. list() -> Pattern = filename:join([install_dir(), "*", "release.json"]), All = lists:filtermap( @@ -381,10 +369,10 @@ list() -> %%-------------------------------------------------------------------- %% Package utils --spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. -decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> - decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap)); -decode_plugin_avro_config(NameVsn, AvroJsonBin) -> +-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. +decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> + decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap)); +decode_plugin_config_map(NameVsn, AvroJsonBin) -> case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of {ok, Config} -> {ok, Config}; {error, ReasonMap} -> {error, ReasonMap} @@ -712,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) -> read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options) ). -read_plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}). -read_plugin_avro(NameVsn, Options) -> +read_plugin_hocon(NameVsn) -> + read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}). +read_plugin_hocon(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(plugin_config_file(NameVsn), "bad_avro_file", Options) + read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options) ). ensure_exists_and_installed(NameVsn) -> @@ -738,7 +726,7 @@ ensure_exists_and_installed(NameVsn) -> do_get_from_cluster(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_from_any_node(Nodes, NameVsn, []) of + case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); @@ -761,19 +749,19 @@ do_get_from_cluster(NameVsn) -> {error, ErrMeta} end. -get_from_any_node([], _NameVsn, Errors) -> +get_plugin_tar_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_from_any_node([Node | T], NameVsn, Errors) -> +get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of {ok, _} = Res -> Res; Err -> - get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. -get_avro_config_from_any_node([], _NameVsn, Errors) -> +get_plugin_config_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> +get_plugin_config_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v2:get_config( Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000 @@ -782,7 +770,7 @@ get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> {ok, _} = Res -> Res; Err -> - get_avro_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. plugins_readme(NameVsn, #{fill_readme := true}, Info) -> @@ -1159,30 +1147,37 @@ do_create_config_dir(NameVsn) -> maybe_ensure_plugin_config(NameVsn) -> maybe true ?= with_plugin_avsc(NameVsn), - _ = do_ensure_plugin_config(NameVsn) + _ = ensure_plugin_config(NameVsn) else _ -> ok end. -do_ensure_plugin_config(NameVsn) -> - %% fetch plugin avro config from cluster +ensure_plugin_config(NameVsn) -> + %% fetch plugin hocon config from cluster Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_avro_config_from_any_node(Nodes, NameVsn, []) of - {ok, AvroJsonMap} when is_map(AvroJsonMap) -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = file:write_file(plugin_config_file(NameVsn), AvroJsonBin), - ensure_avro_config(NameVsn); + ensure_plugin_config(NameVsn, Nodes). +ensure_plugin_config(NameVsn, []) -> + ?SLOG(debug, #{ + msg => "default_plugin_config_used", + name_vsn => NameVsn, + reason => "no_other_running_nodes" + }), + cp_default_config_file(NameVsn); +ensure_plugin_config(NameVsn, Nodes) -> + case get_plugin_config_from_any_node(Nodes, NameVsn, []) of + {ok, ConfigMap} when is_map(ConfigMap) -> + HoconBin = hocon_pp:do(ConfigMap, #{}), + ok = file:write_file(plugin_config_file(NameVsn), HoconBin), + ensure_config_map(NameVsn); _ -> - ?SLOG(info, #{msg => "config_not_found_from_cluster"}), - %% otherwise cp default avro file + ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}), + %% otherwise cp default hocon file %% i.e. Clean installation - cp_default_config_file(NameVsn), - ensure_avro_config(NameVsn) + cp_default_config_file(NameVsn) end. cp_default_config_file(NameVsn) -> - %% always copy default avro file into config dir - %% when can not get config from other nodes + %% always copy default hocon file into config dir when can not get config from other nodes Source = default_plugin_config_file(NameVsn), Destination = plugin_config_file(NameVsn), maybe @@ -1194,40 +1189,39 @@ cp_default_config_file(NameVsn) -> ok; {error, Reason} -> ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", + msg => "failed_to_copy_plugin_default_hocon_config", source => Source, destination => Destination, reason => Reason }) end else - _ -> ensure_avro_config(NameVsn) + _ -> ensure_config_map(NameVsn) end. -ensure_avro_config(NameVsn) -> +ensure_config_map(NameVsn) -> with_plugin_avsc(NameVsn) andalso - do_ensure_avro_config(NameVsn). + do_ensure_config_map(NameVsn). -do_ensure_avro_config(NameVsn) -> - case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of - {ok, AvroJsonMap} -> - {ok, Config} = decode_plugin_avro_config( - NameVsn, emqx_utils_json:encode(AvroJsonMap) - ), - put_config(NameVsn, AvroJsonMap, Config); +do_ensure_config_map(NameVsn) -> + case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of + {ok, ConfigJsonMap} -> + {ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap), + put_config(NameVsn, ConfigJsonMap, Config); _ -> + ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}), ok end. %% @private Backup the current config to a file with a timestamp suffix and %% then save the new config to the config file. -backup_and_write_avro_bin(NameVsn, AvroBin) -> +backup_and_write_hocon_bin(NameVsn, HoconBin) -> %% this may fail, but we don't care %% e.g. read-only file system Path = plugin_config_file(NameVsn), _ = filelib:ensure_dir(Path), TmpFile = Path ++ ".tmp", - case file:write_file(TmpFile, AvroBin) of + case file:write_file(TmpFile, HoconBin) of ok -> backup_and_replace(Path, TmpFile); {error, Reason} -> @@ -1313,7 +1307,7 @@ plugin_dir(NameVsn) -> plugin_priv_dir(NameVsn) -> case read_plugin_info(NameVsn, #{fill_readme => false}) of {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> - AppDir = <>, + AppDir = make_name_vsn_string(Name, Vsn), wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); _ -> wrap_list_path(filename:join([install_dir(), NameVsn, "priv"])) diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 0e933d351..fa91a8512 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -27,8 +27,6 @@ -include("emqx_plugins.hrl"). -include_lib("emqx/include/bpapi.hrl"). --type name_vsn() :: binary() | string(). - introduced_in() -> "5.7.0".