diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 10d27fc63..7c78d43d9 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -64,6 +64,7 @@ {emqx_node_rebalance_status,2}. {emqx_persistent_session_ds,1}. {emqx_plugins,1}. +{emqx_plugins,2}. {emqx_prometheus,1}. {emqx_prometheus,2}. {emqx_resource,1}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 94b16320a..ed5f016b3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -21,6 +21,8 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_plugins/include/emqx_plugins.hrl"). +-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}). + -export([ api_spec/0, fields/1, @@ -178,6 +180,9 @@ schema("/plugins/:name/config") -> responses => #{ %% avro data, json encoded 200 => hoconsc:mk(binary()), + 400 => emqx_dashboard_swagger:error_codes( + ['BAD_CONFIG'], <<"Plugin Config Not Found">> + ), 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } }, @@ -488,13 +493,13 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> plugin_config(get, #{bindings := #{name := NameVsn}}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:get_config(NameVsn) of - {ok, AvroJson} -> + case emqx_plugins:get_config(NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found) of + {ok, AvroJson} when is_map(AvroJson) -> {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson}; - {error, _} -> + {ok, ?plugin_conf_not_found} -> {400, #{ code => 'BAD_CONFIG', - message => <<"Failed to get plugin config">> + message => <<"Plugin Config Not Found">> }} end; _ -> @@ -503,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) -> plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> case emqx_plugins:describe(NameVsn) of {ok, _} -> - case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of + case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of {ok, AvroValueConfig} -> Nodes = emqx:running_nodes(), %% cluster call with config in map (binary key-value) @@ -534,7 +539,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> {error, Reason} -> {400, #{code => 'BAD_POSITION', message => Reason}}; Position -> - case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of + case emqx_plugins:ensure_enabled(Name, Position, global) of ok -> {204}; {error, Reason} -> @@ -599,9 +604,9 @@ ensure_action(Name, restart) -> ok. %% for RPC plugin avro encoded config update -do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) -> +do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) -> %% TODO: maybe use `PluginConfigMap` to validate config - emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap). + emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap). %%-------------------------------------------------------------------- %% Helper functions @@ -694,10 +699,11 @@ aggregate_status([{Node, Plugins} | List], Acc) -> ), aggregate_status(List, NewAcc). +-if(?EMQX_RELEASE_EDITION == ee). format_plugin_avsc_and_i18n(NameVsn) -> #{ - avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end), - i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end) + avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end), + i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end) }. try_read_file(Fun) -> @@ -706,6 +712,11 @@ try_read_file(Fun) -> _ -> null end. +-else. +format_plugin_avsc_and_i18n(_NameVsn) -> + #{avsc => null, i18n => null}. +-endif. + % running_status: running loaded, stopped %% config_status: not_configured disable enable plugin_status(#{running_status := running}) -> running; diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index f822b9c8d..66f23d4b0 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -21,15 +21,27 @@ -define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab). --define(CONFIG_FORMAT_AVRO, config_format_avro). +-define(CONFIG_FORMAT_BIN, config_format_bin). -define(CONFIG_FORMAT_MAP, config_format_map). +-define(plugin_conf_not_found, plugin_conf_not_found). + -type schema_name() :: binary(). -type avsc_path() :: string(). -type encoded_data() :: iodata(). -type decoded_data() :: map(). +%% "my_plugin-0.1.0" +-type name_vsn() :: binary() | string(). +%% the parse result of the JSON info file +-type plugin_info() :: map(). +-type schema_json_map() :: map(). +-type i18n_json_map() :: map(). +-type raw_plugin_config_content() :: binary(). +-type plugin_config_map() :: map(). +-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. + -record(plugin_schema_serde, { name :: schema_name(), eval_context :: term(), diff --git a/apps/emqx_plugins/src/emqx_plugins.appup.src b/apps/emqx_plugins/src/emqx_plugins.appup.src deleted file mode 100644 index f9474dd33..000000000 --- a/apps/emqx_plugins/src/emqx_plugins.appup.src +++ /dev/null @@ -1,8 +0,0 @@ -%% -*- mode: erlang -*- -{"0.1.0", - [ {<<".*">>, []} - ], - [ - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 108ef1386..ae97e7f5f 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -16,8 +16,11 @@ -module(emqx_plugins). --include_lib("emqx/include/logger.hrl"). +-feature(maybe_expr, enable). + -include("emqx_plugins.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -25,15 +28,16 @@ -export([ describe/1, - plugin_avsc/1, - plugin_i18n/1, - plugin_avro/1, + plugin_schema_json/1, + plugin_i18n_json/1, + raw_plugin_config_content/1, parse_name_vsn/1, make_name_vsn_string/2 ]). %% Package operations -export([ + ensure_installed/0, ensure_installed/1, ensure_uninstalled/1, ensure_enabled/1, @@ -65,9 +69,10 @@ %% Package utils -export([ - decode_plugin_avro_config/2, + decode_plugin_config_map/2, install_dir/0, - avsc_file_path/1 + avsc_file_path/1, + with_plugin_avsc/1 ]). %% `emqx_config_handler' API @@ -79,7 +84,10 @@ -export([get_tar/1]). %% Internal export --export([do_ensure_started/1]). +-export([ + ensure_config_map/1, + do_ensure_started/1 +]). %% for test cases -export([put_config_internal/2]). @@ -96,36 +104,26 @@ -define(MAX_KEEP_BACKUP_CONFIGS, 10). -%% "my_plugin-0.1.0" --type name_vsn() :: binary() | string(). -%% the parse result of the JSON info file --type plugin() :: map(). --type schema_json() :: map(). --type i18n_json() :: map(). --type avro_binary() :: binary(). --type plugin_config() :: map(). --type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Describe a plugin. --spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. +-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}. describe(NameVsn) -> read_plugin_info(NameVsn, #{fill_readme => true}). --spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}. -plugin_avsc(NameVsn) -> +-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}. +plugin_schema_json(NameVsn) -> read_plugin_avsc(NameVsn). --spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}. -plugin_i18n(NameVsn) -> +-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}. +plugin_i18n_json(NameVsn) -> read_plugin_i18n(NameVsn). --spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}. -plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn). +-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}. +raw_plugin_config_content(NameVsn) -> + read_plugin_hocon(NameVsn). parse_name_vsn(NameVsn) when is_binary(NameVsn) -> parse_name_vsn(binary_to_list(NameVsn)); @@ -141,15 +139,32 @@ make_name_vsn_string(Name, Vsn) -> %%-------------------------------------------------------------------- %% Package operations +%% @doc Start all configured plugins are started. +-spec ensure_installed() -> ok. +ensure_installed() -> + Fun = fun(#{name_vsn := NameVsn}) -> + case ensure_installed(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end + end, + ok = for_plugins(Fun). + %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, map()}. ensure_installed(NameVsn) -> case read_plugin_info(NameVsn, #{}) of {ok, _} -> - ok; + ok, + _ = maybe_ensure_plugin_config(NameVsn); {error, _} -> ok = purge(NameVsn), - do_ensure_installed(NameVsn) + case ensure_exists_and_installed(NameVsn) of + ok -> + maybe_post_op_after_installed(NameVsn); + {error, _Reason} = Err -> + Err + end end. %% @doc Ensure files and directories for the given plugin are being deleted. @@ -230,7 +245,17 @@ delete_package(NameVsn) -> %% @doc Start all configured plugins are started. -spec ensure_started() -> ok. ensure_started() -> - ok = for_plugins(fun ?MODULE:do_ensure_started/1). + Fun = fun + (#{name_vsn := NameVsn, enable := true}) -> + case do_ensure_started(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; + (#{name_vsn := NameVsn, enable := false}) -> + ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}), + [] + end, + ok = for_plugins(Fun). %% @doc Start a plugin from Management API or CLI. %% the input is a - string. @@ -247,7 +272,17 @@ ensure_started(NameVsn) -> %% @doc Stop all plugins before broker stops. -spec ensure_stopped() -> ok. ensure_stopped() -> - for_plugins(fun ?MODULE:ensure_stopped/1). + Fun = fun + (#{name_vsn := NameVsn, enable := true}) -> + case ensure_stopped(NameVsn) of + ok -> []; + {error, Reason} -> [{NameVsn, Reason}] + end; + (#{name_vsn := NameVsn, enable := false}) -> + ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}), + [] + end, + ok = for_plugins(Fun). %% @doc Stop a plugin from Management API or CLI. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}. @@ -260,37 +295,48 @@ ensure_stopped(NameVsn) -> end ). -get_config(Name, Vsn, Options, Default) -> - get_config(make_name_vsn_string(Name, Vsn), Options, Default). +get_config(Name, Vsn, Opt, Default) -> + get_config(make_name_vsn_string(Name, Vsn), Opt, Default). -spec get_config(name_vsn()) -> - {ok, plugin_config()} + {ok, plugin_config_map() | any()} | {error, term()}. get_config(NameVsn) -> - get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}). + get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}). --spec get_config(name_vsn(), Options :: map()) -> - {ok, avro_binary() | plugin_config()} +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) -> + {ok, raw_plugin_config_content() | plugin_config_map() | any()} | {error, term()}. -get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> - %% no default value when get raw binary config - case read_plugin_avro(NameVsn) of - {ok, _AvroJson} = Res -> Res; - {error, _Reason} = Err -> Err - end; -get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) -> - get_config(NameVsn, Options, #{}). +get_config(NameVsn, ?CONFIG_FORMAT_MAP) -> + get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}); +get_config(NameVsn, ?CONFIG_FORMAT_BIN) -> + get_config_bin(NameVsn). -get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> - {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}. +%% Present default config value only in map format. +-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) -> + {ok, plugin_config_map() | any()} + | {error, term()}. +get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) -> + {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}. + +get_config_bin(NameVsn) -> + %% no default value when get raw binary config + case read_plugin_hocon(NameVsn) of + {ok, _Map} = Res -> Res; + {error, _Reason} = Err -> Err + end. %% @doc Update plugin's config. %% RPC call from Management API or CLI. -%% the avro Json Map and plugin config ALWAYS be valid before calling this function. -put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> - AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), - ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), +%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> + put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig); +put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) -> + HoconBin = hocon_pp:do(ConfigJsonMap, #{}), + ok = backup_and_write_hocon_bin(NameVsn, HoconBin), + %% TODO: callback in plugin's on_config_changed (config update by mgmt API) + %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) + ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap), ok. %% @doc Stop and then start the plugin. @@ -302,7 +348,7 @@ restart(NameVsn) -> %% @doc List all installed plugins. %% Including the ones that are installed, but not enabled in config. --spec list() -> [plugin()]. +-spec list() -> [plugin_info()]. list() -> Pattern = filename:join([install_dir(), "*", "release.json"]), All = lists:filtermap( @@ -323,15 +369,24 @@ list() -> %%-------------------------------------------------------------------- %% Package utils --spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. -decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> - decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap)); -decode_plugin_avro_config(NameVsn, AvroJsonBin) -> +-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. +decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> + decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap)); +decode_plugin_config_map(NameVsn, AvroJsonBin) -> case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of {ok, Config} -> {ok, Config}; {error, ReasonMap} -> {error, ReasonMap} end. +-spec with_plugin_avsc(name_vsn()) -> boolean(). +with_plugin_avsc(NameVsn) -> + case read_plugin_info(NameVsn, #{fill_readme => false}) of + {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) -> + WithAvsc; + _ -> + false + end. + get_config_interal(Key, Default) when is_atom(Key) -> get_config_interal([Key], Default); get_config_interal(Path, Default) -> @@ -438,7 +493,6 @@ do_ensure_installed(NameVsn) -> ok = write_tar_file_content(install_dir(), TarContent), case read_plugin_info(NameVsn, #{}) of {ok, _} -> - ok = maybe_post_op_after_install(NameVsn), ok; {error, Reason} -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), @@ -603,7 +657,11 @@ tryit(WhichOp, F) -> exception => Reason, stacktrace => Stacktrace }), - {error, {failed, WhichOp}} + {error, #{ + which_op => WhichOp, + exception => Reason, + stacktrace => Stacktrace + }} end. %% read plugin info from the JSON file @@ -611,16 +669,16 @@ tryit(WhichOp, F) -> read_plugin_info(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - fun() -> {ok, do_read_plugin2(NameVsn, Options)} end + fun() -> {ok, do_read_plugin(NameVsn, Options)} end ). do_read_plugin(NameVsn) -> - do_read_plugin2(NameVsn, #{}). + do_read_plugin(NameVsn, #{}). -do_read_plugin2(NameVsn, Option) -> - do_read_plugin3(NameVsn, info_file_path(NameVsn), Option). +do_read_plugin(NameVsn, Option) -> + do_read_plugin(NameVsn, info_file_path(NameVsn), Option). -do_read_plugin3(NameVsn, InfoFilePath, Options) -> +do_read_plugin(NameVsn, InfoFilePath, Options) -> {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(), Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath), Info1 = plugins_readme(NameVsn, Options, Info0), @@ -642,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) -> read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options) ). -read_plugin_avro(NameVsn) -> - read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}). -read_plugin_avro(NameVsn, Options) -> +read_plugin_hocon(NameVsn) -> + read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}). +read_plugin_hocon(NameVsn, Options) -> tryit( atom_to_list(?FUNCTION_NAME), - read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options) + read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options) ). ensure_exists_and_installed(NameVsn) -> @@ -659,7 +717,7 @@ ensure_exists_and_installed(NameVsn) -> case get_tar(NameVsn) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), - ok = do_ensure_installed(NameVsn); + do_ensure_installed(NameVsn); _ -> %% If not, try to get it from the cluster. do_get_from_cluster(NameVsn) @@ -668,33 +726,51 @@ ensure_exists_and_installed(NameVsn) -> do_get_from_cluster(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_from_any_node(Nodes, NameVsn, []) of + case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of {ok, TarContent} -> ok = file:write_file(pkg_file_path(NameVsn), TarContent), ok = do_ensure_installed(NameVsn); {error, NodeErrors} when Nodes =/= [] -> - ?SLOG(error, #{ - msg => "failed_to_copy_plugin_from_other_nodes", + ErrMeta = #{ + error_msg => "failed_to_copy_plugin_from_other_nodes", name_vsn => NameVsn, - node_errors => NodeErrors - }), - {error, plugin_not_found}; + node_errors => NodeErrors, + reason => not_found + }, + ?SLOG(error, ErrMeta), + {error, ErrMeta}; {error, _} -> - ?SLOG(error, #{ - msg => "no_nodes_to_copy_plugin_from", - name_vsn => NameVsn - }), - {error, plugin_not_found} + ErrMeta = #{ + error_msg => "no_nodes_to_copy_plugin_from", + name_vsn => NameVsn, + reason => not_found + }, + ?SLOG(error, ErrMeta), + {error, ErrMeta} end. -get_from_any_node([], _NameVsn, Errors) -> +get_plugin_tar_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_from_any_node([Node | T], NameVsn, Errors) -> +get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) -> case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of {ok, _} = Res -> Res; Err -> - get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + end. + +get_plugin_config_from_any_node([], _NameVsn, Errors) -> + {error, Errors}; +get_plugin_config_from_any_node([Node | T], NameVsn, Errors) -> + case + emqx_plugins_proto_v2:get_config( + Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000 + ) + of + {ok, _} = Res -> + Res; + Err -> + get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. plugins_readme(NameVsn, #{fill_readme := true}, Info) -> @@ -1011,29 +1087,31 @@ configured() -> get_config_interal(states, []). for_plugins(ActionFun) -> - case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of - [] -> ok; - Errors -> erlang:error(#{function => ActionFun, errors => Errors}) + case lists:flatmap(ActionFun, configured()) of + [] -> + ok; + Errors -> + ErrMeta = #{function => ActionFun, errors => Errors}, + ?tp( + for_plugins_action_error_occurred, + ErrMeta + ), + ?SLOG(error, ErrMeta), + ok end. -for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> - case Fun(NameVsn) of - ok -> []; - {error, Reason} -> [{NameVsn, Reason}] - end; -for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> - ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}), - []. - -maybe_post_op_after_install(NameVsn) -> +maybe_post_op_after_installed(NameVsn) -> _ = maybe_load_config_schema(NameVsn), - _ = maybe_create_config_dir(NameVsn), + _ = ensure_state(NameVsn, no_move, false, global), ok. maybe_load_config_schema(NameVsn) -> AvscPath = avsc_file_path(NameVsn), - filelib:is_regular(AvscPath) andalso - do_load_config_schema(NameVsn, AvscPath). + _ = + with_plugin_avsc(NameVsn) andalso + filelib:is_regular(AvscPath) andalso + do_load_config_schema(NameVsn, AvscPath), + _ = maybe_create_config_dir(NameVsn). do_load_config_schema(NameVsn, AvscPath) -> case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of @@ -1043,28 +1121,107 @@ do_load_config_schema(NameVsn, AvscPath) -> end. maybe_create_config_dir(NameVsn) -> - ConfigDir = plugin_config_dir(NameVsn), - case filelib:ensure_path(ConfigDir) of - ok -> - ok; + with_plugin_avsc(NameVsn) andalso + do_create_config_dir(NameVsn). + +do_create_config_dir(NameVsn) -> + case plugin_config_dir(NameVsn) of {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_create_plugin_config_dir", - dir => ConfigDir, - reason => Reason - }), - {error, {mkdir_failed, ConfigDir, Reason}} + {error, {gen_config_dir_failed, Reason}}; + ConfigDir -> + case filelib:ensure_path(ConfigDir) of + ok -> + %% get config from other nodes or get from tarball + _ = maybe_ensure_plugin_config(NameVsn), + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_create_plugin_config_dir", + dir => ConfigDir, + reason => Reason + }), + {error, {mkdir_failed, ConfigDir, Reason}} + end + end. + +maybe_ensure_plugin_config(NameVsn) -> + maybe + true ?= with_plugin_avsc(NameVsn), + _ = ensure_plugin_config(NameVsn) + else + _ -> ok + end. + +ensure_plugin_config(NameVsn) -> + %% fetch plugin hocon config from cluster + Nodes = [N || N <- mria:running_nodes(), N /= node()], + ensure_plugin_config(NameVsn, Nodes). +ensure_plugin_config(NameVsn, []) -> + ?SLOG(debug, #{ + msg => "default_plugin_config_used", + name_vsn => NameVsn, + reason => "no_other_running_nodes" + }), + cp_default_config_file(NameVsn); +ensure_plugin_config(NameVsn, Nodes) -> + case get_plugin_config_from_any_node(Nodes, NameVsn, []) of + {ok, ConfigMap} when is_map(ConfigMap) -> + HoconBin = hocon_pp:do(ConfigMap, #{}), + ok = file:write_file(plugin_config_file(NameVsn), HoconBin), + ensure_config_map(NameVsn); + _ -> + ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}), + %% otherwise cp default hocon file + %% i.e. Clean installation + cp_default_config_file(NameVsn) + end. + +cp_default_config_file(NameVsn) -> + %% always copy default hocon file into config dir when can not get config from other nodes + Source = default_plugin_config_file(NameVsn), + Destination = plugin_config_file(NameVsn), + maybe + true ?= filelib:is_regular(Source), + %% destination path not existed (not configured) + true ?= (not filelib:is_regular(Destination)), + case file:copy(Source, Destination) of + {ok, _} -> + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_hocon_config", + source => Source, + destination => Destination, + reason => Reason + }) + end + else + _ -> ensure_config_map(NameVsn) + end. + +ensure_config_map(NameVsn) -> + with_plugin_avsc(NameVsn) andalso + do_ensure_config_map(NameVsn). + +do_ensure_config_map(NameVsn) -> + case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of + {ok, ConfigJsonMap} -> + {ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap), + put_config(NameVsn, ConfigJsonMap, Config); + _ -> + ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}), + ok end. %% @private Backup the current config to a file with a timestamp suffix and %% then save the new config to the config file. -backup_and_write_avro_bin(NameVsn, AvroBin) -> +backup_and_write_hocon_bin(NameVsn, HoconBin) -> %% this may fail, but we don't care %% e.g. read-only file system - Path = avro_config_file(NameVsn), + Path = plugin_config_file(NameVsn), _ = filelib:ensure_dir(Path), TmpFile = Path ++ ".tmp", - case file:write_file(TmpFile, AvroBin) of + case file:write_file(TmpFile, HoconBin) of ok -> backup_and_replace(Path, TmpFile); {error, Reason} -> @@ -1146,9 +1303,29 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> plugin_dir(NameVsn) -> wrap_list_path(filename:join([install_dir(), NameVsn])). --spec plugin_config_dir(name_vsn()) -> string(). +-spec plugin_priv_dir(name_vsn()) -> string(). +plugin_priv_dir(NameVsn) -> + case read_plugin_info(NameVsn, #{fill_readme => false}) of + {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> + AppDir = make_name_vsn_string(Name, Vsn), + wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); + _ -> + wrap_list_path(filename:join([install_dir(), NameVsn, "priv"])) + end. + +-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. plugin_config_dir(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])). + case parse_name_vsn(NameVsn) of + {ok, NameAtom, _Vsn} -> + wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)])); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_generate_plugin_config_dir_for_plugin", + plugin_namevsn => NameVsn, + reason => Reason + }), + {error, Reason} + end. %% Files -spec pkg_file_path(name_vsn()) -> string(). @@ -1161,15 +1338,20 @@ info_file_path(NameVsn) -> -spec avsc_file_path(name_vsn()) -> string(). avsc_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])). + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])). --spec avro_config_file(name_vsn()) -> string(). -avro_config_file(NameVsn) -> - wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])). +-spec plugin_config_file(name_vsn()) -> string(). +plugin_config_file(NameVsn) -> + wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.hocon"])). + +%% should only used when plugin installing +-spec default_plugin_config_file(name_vsn()) -> string(). +default_plugin_config_file(NameVsn) -> + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])). -spec i18n_file_path(name_vsn()) -> string(). i18n_file_path(NameVsn) -> - wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])). + wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])). -spec readme_file(name_vsn()) -> string(). readme_file(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_app.erl b/apps/emqx_plugins/src/emqx_plugins_app.erl index a8b2fd6d6..10e483c22 100644 --- a/apps/emqx_plugins/src/emqx_plugins_app.erl +++ b/apps/emqx_plugins/src/emqx_plugins_app.erl @@ -27,8 +27,9 @@ start(_Type, _Args) -> %% load all pre-configured - ok = emqx_plugins:ensure_started(), {ok, Sup} = emqx_plugins_sup:start_link(), + ok = emqx_plugins:ensure_installed(), + ok = emqx_plugins:ensure_started(), ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins), {ok, Sup}. diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index b50258b9c..b76f71991 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -33,7 +33,6 @@ init/1, handle_call/3, handle_cast/2, - handle_continue/2, terminate/2 ]). @@ -126,11 +125,10 @@ init(_) -> ]), State = #{}, AvscPaths = get_plugin_avscs(), - {ok, State, {continue, {build_serdes, AvscPaths}}}. - -handle_continue({build_serdes, AvscPaths}, State) -> + %% force build all schemas at startup + %% otherwise plugin schema may not be available when needed _ = build_serdes(AvscPaths), - {noreply, State}. + {ok, State}. handle_call({build_serdes, NameVsn, AvscPath}, _From, State) -> BuildRes = do_build_serde({NameVsn, AvscPath}), @@ -153,10 +151,10 @@ terminate(_Reason, _State) -> -spec get_plugin_avscs() -> [{string(), string()}]. get_plugin_avscs() -> - Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), + Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]), lists:foldl( fun(AvscPath, AccIn) -> - [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + [_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)), [{to_bin(NameVsn), AvscPath} | AccIn] end, _Acc0 = [], diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl new file mode 100644 index 000000000..fa91a8512 --- /dev/null +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_tar/3, + get_config/5 +]). + +-include("emqx_plugins.hrl"). +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any()}. +get_tar(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). + +-spec get_config( + node(), name_vsn(), ?CONFIG_FORMAT_MAP, any(), timeout() +) -> {ok, map() | any()} | {error, any()}. +get_config(Node, NameVsn, Opt, Default, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opt, Default], Timeout). diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 80f3d7a48..5c8c51f1e 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin). -define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)). @@ -273,9 +274,15 @@ t_start_restart_and_stop(Config) -> %% fake enable bar-2 ok = ensure_state(Bar2, rear, true), %% should cause an error - ?assertError( - #{function := _, errors := [_ | _]}, - emqx_plugins:ensure_started() + ?check_trace( + emqx_plugins:ensure_started(), + fun(Trace) -> + ?assertMatch( + [#{function := _, errors := [_ | _]}], + ?of_kind(for_plugins_action_error_occurred, Trace) + ), + ok + end ), %% but demo plugin should still be running assert_app_running(?EMQX_PLUGIN_APP_NAME, true), @@ -337,7 +344,7 @@ t_enable_disable({'end', Config}) -> t_enable_disable(Config) -> NameVsn = proplists:get_value(name_vsn, Config), ok = emqx_plugins:ensure_installed(NameVsn), - ?assertEqual([], emqx_plugins:configured()), + ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()), ok = emqx_plugins:ensure_enabled(NameVsn), ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()), ok = emqx_plugins:ensure_disabled(NameVsn), @@ -379,9 +386,10 @@ t_bad_tar_gz(Config) -> }}, emqx_plugins:ensure_installed("fake-vsn") ), + %% the plugin tarball can not be found on any nodes ?assertMatch( {error, #{ - error_msg := "failed_to_extract_plugin_package", + error_msg := "no_nodes_to_copy_plugin_from", reason := not_found }}, emqx_plugins:ensure_installed("nonexisting") @@ -556,7 +564,7 @@ t_load_config_from_cli({'end', Config}) -> t_load_config_from_cli(Config) when is_list(Config) -> NameVsn = ?config(name_vsn, Config), ok = emqx_plugins:ensure_installed(NameVsn), - ?assertEqual([], emqx_plugins:configured()), + ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()), ok = emqx_plugins:ensure_enabled(NameVsn), ok = emqx_plugins:ensure_started(NameVsn), Params0 = unused, @@ -687,6 +695,14 @@ group_t_copy_plugin_to_a_new_node(Config) -> %% see: emqx_conf_app:init_conf/0 ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]), {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]), + + %% Plugin config should be synced from `CopyFromNode` + %% by application `emqx` and `emqx_conf` + %% FIXME: in test case, we manually do it here + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []), + ?assertMatch( {ok, #{running_status := running, config_status := enabled}}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn]) @@ -739,6 +755,16 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) -> ct:pal("~p install_dir:\n ~p", [ CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir]) ]), + + %% Plugin config should be synced from `CopyFromNode` + %% by application `emqx` and `emqx_conf` + %% FIXME: in test case, we manually do it here + ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [ + [states], [#{enable => true, name_vsn => NameVsn}] + ]), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []), + ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []), + ?assertMatch( {ok, #{running_status := running, config_status := enabled}}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn]) @@ -785,6 +811,11 @@ group_t_cluster_leave(Config) -> ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]), ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]), ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]), + + ok = erpc:call(N2, emqx_plugins, ensure_installed, [NameVsn]), + ok = erpc:call(N2, emqx_plugins, ensure_started, [NameVsn]), + ok = erpc:call(N2, emqx_plugins, ensure_enabled, [NameVsn]), + Params = unused, %% 2 nodes running ?assertMatch( diff --git a/scripts/ensure-rebar3.sh b/scripts/ensure-rebar3.sh index 054deabd4..14b6ad7c7 100755 --- a/scripts/ensure-rebar3.sh +++ b/scripts/ensure-rebar3.sh @@ -4,17 +4,8 @@ set -euo pipefail [ "${DEBUG:-0}" -eq 1 ] && set -x -## rebar3 tag 3.19.0-emqx-1 is compiled using latest official OTP-24 image. -## we have to use an otp24-compiled rebar3 because the defination of record #application{} -## in systools.hrl is changed in otp24. OTP_VSN="${OTP_VSN:-$(./scripts/get-otp-vsn.sh)}" case ${OTP_VSN} in - 23*) - VERSION="3.16.1-emqx-1" - ;; - 24*) - VERSION="3.18.0-emqx-1" - ;; 25*) VERSION="3.19.0-emqx-9" ;; @@ -22,7 +13,7 @@ case ${OTP_VSN} in VERSION="3.20.0-emqx-1" ;; *) - echo "Unsupporetd Erlang/OTP version $OTP_VSN" + echo "Unsupported Erlang/OTP version $OTP_VSN" exit 1 ;; esac diff --git a/scripts/get-otp-vsn.sh b/scripts/get-otp-vsn.sh index 699a16f3f..d21e2c830 100755 --- a/scripts/get-otp-vsn.sh +++ b/scripts/get-otp-vsn.sh @@ -2,4 +2,4 @@ set -euo pipefail -erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().' +erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().'