fix: put plugin config with binary namevsn key
This commit is contained in:
parent
140b7ce51e
commit
2076681e76
|
@ -24,6 +24,8 @@
|
||||||
-define(CONFIG_FORMAT_AVRO, config_format_avro).
|
-define(CONFIG_FORMAT_AVRO, config_format_avro).
|
||||||
-define(CONFIG_FORMAT_MAP, config_format_map).
|
-define(CONFIG_FORMAT_MAP, config_format_map).
|
||||||
|
|
||||||
|
-define(plugin_conf_not_found, plugin_conf_not_found).
|
||||||
|
|
||||||
-type schema_name() :: binary().
|
-type schema_name() :: binary().
|
||||||
-type avsc_path() :: string().
|
-type avsc_path() :: string().
|
||||||
|
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
%% -*- mode: erlang -*-
|
|
||||||
{"0.1.0",
|
|
||||||
[ {<<".*">>, []}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
|
||||||
}.
|
|
|
@ -80,7 +80,10 @@
|
||||||
-export([get_tar/1]).
|
-export([get_tar/1]).
|
||||||
|
|
||||||
%% Internal export
|
%% Internal export
|
||||||
-export([do_ensure_started/1]).
|
-export([
|
||||||
|
ensure_avro_config/1,
|
||||||
|
do_ensure_started/1
|
||||||
|
]).
|
||||||
%% for test cases
|
%% for test cases
|
||||||
-export([put_config_internal/2]).
|
-export([put_config_internal/2]).
|
||||||
|
|
||||||
|
@ -150,7 +153,8 @@ ensure_installed(NameVsn) ->
|
||||||
ok;
|
ok;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
ok = purge(NameVsn),
|
ok = purge(NameVsn),
|
||||||
do_ensure_installed(NameVsn)
|
do_ensure_installed(NameVsn),
|
||||||
|
ok = maybe_post_op_after_install(NameVsn)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Ensure files and directories for the given plugin are being deleted.
|
%% @doc Ensure files and directories for the given plugin are being deleted.
|
||||||
|
@ -288,9 +292,13 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
|
||||||
%% @doc Update plugin's config.
|
%% @doc Update plugin's config.
|
||||||
%% RPC call from Management API or CLI.
|
%% RPC call from Management API or CLI.
|
||||||
%% the avro Json Map and plugin config ALWAYS be valid before calling this function.
|
%% the avro Json Map and plugin config ALWAYS be valid before calling this function.
|
||||||
|
put_config(NameVsn, AvroJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
|
||||||
|
put_config(bin(NameVsn), AvroJsonMap, DecodedPluginConfig);
|
||||||
put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
|
put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
|
||||||
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
|
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
|
||||||
ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin),
|
ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin),
|
||||||
|
%% {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
|
||||||
|
%% ok = PluginModule:on_config_changed(NameVsn, AvroJsonMap),
|
||||||
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
|
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -581,8 +589,6 @@ do_ensure_started(NameVsn) ->
|
||||||
case ensure_exists_and_installed(NameVsn) of
|
case ensure_exists_and_installed(NameVsn) of
|
||||||
ok ->
|
ok ->
|
||||||
Plugin = do_read_plugin(NameVsn),
|
Plugin = do_read_plugin(NameVsn),
|
||||||
%% ok = ensure_avro_config(NameVsn);
|
|
||||||
ok = maybe_post_op_after_install(NameVsn),
|
|
||||||
ok = load_code_start_apps(NameVsn, Plugin);
|
ok = load_code_start_apps(NameVsn, Plugin);
|
||||||
{error, plugin_not_found} ->
|
{error, plugin_not_found} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
|
@ -614,7 +620,11 @@ tryit(WhichOp, F) ->
|
||||||
exception => Reason,
|
exception => Reason,
|
||||||
stacktrace => Stacktrace
|
stacktrace => Stacktrace
|
||||||
}),
|
}),
|
||||||
{error, {failed, WhichOp}}
|
{error, #{
|
||||||
|
which_op => WhichOp,
|
||||||
|
exception => Reason,
|
||||||
|
stacktrace => Stacktrace
|
||||||
|
}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% read plugin info from the JSON file
|
%% read plugin info from the JSON file
|
||||||
|
@ -708,10 +718,14 @@ get_from_any_node([Node | T], NameVsn, Errors) ->
|
||||||
get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_config_from_any_node([], _NameVsn, Errors) ->
|
get_avro_config_from_any_node([], _NameVsn, Errors) ->
|
||||||
{error, Errors};
|
{error, Errors};
|
||||||
get_config_from_any_node([Node | T], NameVsn, Errors) ->
|
get_avro_config_from_any_node([Node | T], NameVsn, Errors) ->
|
||||||
case emqx_plugins_proto_v2:get_config(Node, NameVsn, 5_000) of
|
case
|
||||||
|
emqx_plugins_proto_v2:get_config(
|
||||||
|
Node, NameVsn, #{format => ?CONFIG_FORMAT_MAP}, ?plugin_conf_not_found, 5_000
|
||||||
|
)
|
||||||
|
of
|
||||||
{ok, _} = Res ->
|
{ok, _} = Res ->
|
||||||
Res;
|
Res;
|
||||||
Err ->
|
Err ->
|
||||||
|
@ -1041,6 +1055,8 @@ for_plugins(ActionFun) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
|
for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
|
||||||
|
%% always ensure the plugin is installed
|
||||||
|
ok = ensure_avro_config(NameVsn),
|
||||||
case Fun(NameVsn) of
|
case Fun(NameVsn) of
|
||||||
ok -> [];
|
ok -> [];
|
||||||
{error, Reason} -> [{NameVsn, Reason}]
|
{error, Reason} -> [{NameVsn, Reason}]
|
||||||
|
@ -1094,31 +1110,41 @@ do_create_config_dir(NameVsn) ->
|
||||||
|
|
||||||
maybe_ensure_plugin_config(NameVsn) ->
|
maybe_ensure_plugin_config(NameVsn) ->
|
||||||
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
||||||
case get_config_from_any_node(Nodes, NameVsn, []) of
|
case get_avro_config_from_any_node(Nodes, NameVsn, []) of
|
||||||
{ok, AvroBin} ->
|
{ok, AvroJsonMap} when is_map(AvroJsonMap) ->
|
||||||
ok = file:write_file(avro_config_file(NameVsn), AvroBin),
|
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
|
||||||
|
ok = file:write_file(avro_config_file(NameVsn), AvroJsonBin),
|
||||||
ensure_avro_config(NameVsn);
|
ensure_avro_config(NameVsn);
|
||||||
|
%% {ok, ?plugin_conf_not_found} ->
|
||||||
_ ->
|
_ ->
|
||||||
%% always copy default avro file into config dir
|
cp_default_avro_file(NameVsn),
|
||||||
%% when can not get config from other nodes
|
ensure_avro_config(NameVsn)
|
||||||
Source = default_avro_config_file(NameVsn),
|
|
||||||
Destination = avro_config_file(NameVsn),
|
|
||||||
filelib:is_regular(Source) andalso
|
|
||||||
case file:copy(Source, Destination) of
|
|
||||||
{ok, _} ->
|
|
||||||
ok,
|
|
||||||
ensure_avro_config(NameVsn);
|
|
||||||
{error, Reason} ->
|
|
||||||
?SLOG(warning, #{
|
|
||||||
msg => "failed_to_copy_plugin_default_avro_config",
|
|
||||||
source => Source,
|
|
||||||
destination => Destination,
|
|
||||||
reason => Reason
|
|
||||||
})
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
cp_default_avro_file(NameVsn) ->
|
||||||
|
%% always copy default avro file into config dir
|
||||||
|
%% when can not get config from other nodes
|
||||||
|
Source = default_avro_config_file(NameVsn),
|
||||||
|
Destination = avro_config_file(NameVsn),
|
||||||
|
filelib:is_regular(Source) andalso
|
||||||
|
case file:copy(Source, Destination) of
|
||||||
|
{ok, _} ->
|
||||||
|
ok,
|
||||||
|
ensure_avro_config(NameVsn);
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "failed_to_copy_plugin_default_avro_config",
|
||||||
|
source => Source,
|
||||||
|
destination => Destination,
|
||||||
|
reason => Reason
|
||||||
|
})
|
||||||
|
end.
|
||||||
|
|
||||||
ensure_avro_config(NameVsn) ->
|
ensure_avro_config(NameVsn) ->
|
||||||
|
with_plugin_avsc(NameVsn) andalso
|
||||||
|
do_ensure_avro_config(NameVsn).
|
||||||
|
|
||||||
|
do_ensure_avro_config(NameVsn) ->
|
||||||
case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of
|
case read_plugin_avro(NameVsn, #{read_mode => ?JSON_MAP}) of
|
||||||
{ok, AvroJsonMap} ->
|
{ok, AvroJsonMap} ->
|
||||||
{ok, Config} = decode_plugin_avro_config(
|
{ok, Config} = decode_plugin_avro_config(
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
-export([
|
-export([
|
||||||
introduced_in/0,
|
introduced_in/0,
|
||||||
get_tar/3,
|
get_tar/3,
|
||||||
get_config/3
|
get_config/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -35,5 +35,5 @@ introduced_in() ->
|
||||||
get_tar(Node, NameVsn, Timeout) ->
|
get_tar(Node, NameVsn, Timeout) ->
|
||||||
rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).
|
rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).
|
||||||
|
|
||||||
get_config(Node, NameVsn, Timeout) ->
|
get_config(Node, NameVsn, Opts, Default, Timeout) ->
|
||||||
rpc:call(Node, emqx_plugins, get_config, [NameVsn], Timeout).
|
rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opts, Default], Timeout).
|
||||||
|
|
Loading…
Reference in New Issue