diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index f822b9c8d..d1411884f 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -24,6 +24,8 @@ -define(CONFIG_FORMAT_AVRO, config_format_avro). -define(CONFIG_FORMAT_MAP, config_format_map). +-define(plugin_conf_not_found, plugin_conf_not_found). + -type schema_name() :: binary(). -type avsc_path() :: string(). diff --git a/apps/emqx_plugins/src/emqx_plugins.appup.src b/apps/emqx_plugins/src/emqx_plugins.appup.src deleted file mode 100644 index f9474dd33..000000000 --- a/apps/emqx_plugins/src/emqx_plugins.appup.src +++ /dev/null @@ -1,8 +0,0 @@ -%% -*- mode: erlang -*- -{"0.1.0", - [ {<<".*">>, []} - ], - [ - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 1f98b2bd4..156edce0b 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -80,7 +80,10 @@ -export([get_tar/1]). %% Internal export --export([do_ensure_started/1]). +-export([ + ensure_avro_config/1, + do_ensure_started/1 +]). %% for test cases -export([put_config_internal/2]). @@ -150,7 +153,8 @@ ensure_installed(NameVsn) -> ok; {error, _} -> ok = purge(NameVsn), - do_ensure_installed(NameVsn) + do_ensure_installed(NameVsn), + ok = maybe_post_op_after_install(NameVsn) end. %% @doc Ensure files and directories for the given plugin are being deleted. @@ -288,9 +292,13 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> %% @doc Update plugin's config. %% RPC call from Management API or CLI. %% the avro Json Map and plugin config ALWAYS be valid before calling this function. +put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> + put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig); put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), + %% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn), + %% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok. @@ -581,8 +589,6 @@ do_ensure_started(NameVsn) -> case ensure_exists_and_installed(NameVsn) of ok -> Plugin = do_read_plugin(NameVsn), - %% ok = ensure_avro_config(NameVsn); - ok = maybe_post_op_after_install(NameVsn), ok = load_code_start_apps(NameVsn, Plugin); {error, plugin_not_found} -> ?SLOG(error, #{ @@ -614,7 +620,11 @@ tryit(WhichOp, F) -> exception => Reason, stacktrace => Stacktrace }), - {error, {failed, WhichOp}} + {error, #{ + which_op => WhichOp, + exception => Reason, + stacktrace => Stacktrace + }} end. %% read plugin info from the JSON file @@ -708,10 +718,14 @@ get_from_any_node([Node | T], NameVsn, Errors) -> get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) end. -get_config_from_any_node([], _NameVsn, Errors) -> +get_avro_config_from_any_node([], _NameVsn, Errors) -> {error, Errors}; -get_config_from_any_node([Node | T], NameVsn, Errors) -> - case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of +get_avro_config_from_any_node([Node | T], NameVsn, Errors) -> + case + emqx_plugins_proto_v2:get_config( + Node, NameVsn, #{format => ?CONFIG_FORMAT_MAP}, ?plugin_conf_not_found, 5_000 + ) + of {ok, _} = Res -> Res; Err -> @@ -1041,6 +1055,8 @@ for_plugins(ActionFun) -> end. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> + %% always ensure the plugin is installed + ok = ensure_avro_config(NameVsn), case Fun(NameVsn) of ok -> []; {error, Reason} -> [{NameVsn, Reason}] @@ -1094,31 +1110,41 @@ do_create_config_dir(NameVsn) -> maybe_ensure_plugin_config(NameVsn) -> Nodes = [N || N <- mria:running_nodes(), N /= node()], - case get_config_from_any_node(Nodes, NameVsn, []) of - {ok, AvroBin} -> - ok = file:write_file(avro_config_file(NameVsn), AvroBin), + case get_avro_config_from_any_node(Nodes, NameVsn, []) of + {ok, AvroJsonMap} when is_map(AvroJsonMap) -> + AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), + ok = file:write_file(avro_config_file(NameVsn), AvroJsonBin), ensure_avro_config(NameVsn); + %% {ok, ?plugin_conf_not_found} -> _ -> - %% always copy default avro file into config dir - %% when can not get config from other nodes - Source = default_avro_config_file(NameVsn), - Destination = avro_config_file(NameVsn), - filelib:is_regular(Source) andalso - case file:copy(Source, Destination) of - {ok, _} -> - ok, - ensure_avro_config(NameVsn); - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_copy_plugin_default_avro_config", - source => Source, - destination => Destination, - reason => Reason - }) - end + cp_default_avro_file(NameVsn), + ensure_avro_config(NameVsn) end. +cp_default_avro_file(NameVsn) -> + %% always copy default avro file into config dir + %% when can not get config from other nodes + Source = default_avro_config_file(NameVsn), + Destination = avro_config_file(NameVsn), + filelib:is_regular(Source) andalso + case file:copy(Source, Destination) of + {ok, _} -> + ok, + ensure_avro_config(NameVsn); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_copy_plugin_default_avro_config", + source => Source, + destination => Destination, + reason => Reason + }) + end. + ensure_avro_config(NameVsn) -> + with_plugin_avsc(NameVsn) andalso + do_ensure_avro_config(NameVsn). + +do_ensure_avro_config(NameVsn) -> case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of {ok, AvroJsonMap} -> {ok, Config} = decode_plugin_avro_config( diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl index 480082e28..01d0a5ce1 100644 --- a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl @@ -21,7 +21,7 @@ -export([ introduced_in/0, get_tar/3, - get_config/3 + get_config/5 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -35,5 +35,5 @@ introduced_in() -> get_tar(Node, NameVsn, Timeout) -> rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). -get_config(Node, NameVsn, Timeout) -> - rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout). +get_config(Node, NameVsn, Opts, Default, Timeout) -> + rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opts, Default], Timeout).