Merge pull request #13101 from JimMoen/fix-plugin-config-map
fix: allow put plugin config without schema
This commit is contained in:
commit
40288b99b2
|
@ -20,6 +20,7 @@
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
|
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
|
||||||
|
-include_lib("erlavro/include/erlavro.hrl").
|
||||||
|
|
||||||
-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}).
|
-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}).
|
||||||
|
|
||||||
|
@ -506,14 +507,20 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) ->
|
||||||
{404, plugin_not_found_msg()}
|
{404, plugin_not_found_msg()}
|
||||||
end;
|
end;
|
||||||
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
|
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
|
||||||
|
Nodes = emqx:running_nodes(),
|
||||||
case emqx_plugins:describe(NameVsn) of
|
case emqx_plugins:describe(NameVsn) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
|
case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
|
||||||
{ok, AvroValueConfig} ->
|
{ok, ?plugin_without_config_schema} ->
|
||||||
Nodes = emqx:running_nodes(),
|
%% no plugin avro schema, just put the json map as-is
|
||||||
|
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
|
||||||
|
Nodes, NameVsn, AvroJsonMap, ?plugin_without_config_schema
|
||||||
|
),
|
||||||
|
{204};
|
||||||
|
{ok, AvroValue} ->
|
||||||
%% cluster call with config in map (binary key-value)
|
%% cluster call with config in map (binary key-value)
|
||||||
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
|
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
|
||||||
Nodes, NameVsn, AvroJsonMap, AvroValueConfig
|
Nodes, NameVsn, AvroJsonMap, AvroValue
|
||||||
),
|
),
|
||||||
{204};
|
{204};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -604,9 +611,13 @@ ensure_action(Name, restart) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% for RPC plugin avro encoded config update
|
%% for RPC plugin avro encoded config update
|
||||||
do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) ->
|
-spec do_update_plugin_config(
|
||||||
%% TODO: maybe use `PluginConfigMap` to validate config
|
name_vsn(), map(), avro_value() | ?plugin_without_config_schema
|
||||||
emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap).
|
) ->
|
||||||
|
ok.
|
||||||
|
do_update_plugin_config(NameVsn, AvroJsonMap, AvroValue) ->
|
||||||
|
%% TODO: maybe use `AvroValue` to validate config
|
||||||
|
emqx_plugins:put_config(NameVsn, AvroJsonMap, AvroValue).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
|
||||||
|
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.7.0".
|
"5.7.0".
|
||||||
|
@ -56,14 +57,14 @@ ensure_action(Name, Action) ->
|
||||||
[node()],
|
[node()],
|
||||||
binary() | string(),
|
binary() | string(),
|
||||||
binary(),
|
binary(),
|
||||||
map()
|
map() | ?plugin_without_config_schema
|
||||||
) ->
|
) ->
|
||||||
emqx_rpc:multicall_result().
|
emqx_rpc:multicall_result().
|
||||||
update_plugin_config(Nodes, NameVsn, AvroJsonMap, PluginConfig) ->
|
update_plugin_config(Nodes, NameVsn, AvroJsonMap, MaybeAvroValue) ->
|
||||||
rpc:multicall(
|
rpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
emqx_mgmt_api_plugins,
|
emqx_mgmt_api_plugins,
|
||||||
do_update_plugin_config,
|
do_update_plugin_config,
|
||||||
[NameVsn, AvroJsonMap, PluginConfig],
|
[NameVsn, AvroJsonMap, MaybeAvroValue],
|
||||||
10000
|
10000
|
||||||
).
|
).
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
-define(CONFIG_FORMAT_MAP, config_format_map).
|
-define(CONFIG_FORMAT_MAP, config_format_map).
|
||||||
|
|
||||||
-define(plugin_conf_not_found, plugin_conf_not_found).
|
-define(plugin_conf_not_found, plugin_conf_not_found).
|
||||||
|
-define(plugin_without_config_schema, plugin_without_config_schema).
|
||||||
|
|
||||||
-type schema_name() :: binary().
|
-type schema_name() :: binary().
|
||||||
-type avsc_path() :: string().
|
-type avsc_path() :: string().
|
||||||
|
|
|
@ -161,7 +161,9 @@ ensure_installed(NameVsn) ->
|
||||||
ok = purge(NameVsn),
|
ok = purge(NameVsn),
|
||||||
case ensure_exists_and_installed(NameVsn) of
|
case ensure_exists_and_installed(NameVsn) of
|
||||||
ok ->
|
ok ->
|
||||||
maybe_post_op_after_installed(NameVsn);
|
maybe_post_op_after_installed(NameVsn),
|
||||||
|
_ = maybe_ensure_plugin_config(NameVsn),
|
||||||
|
ok;
|
||||||
{error, _Reason} = Err ->
|
{error, _Reason} = Err ->
|
||||||
Err
|
Err
|
||||||
end
|
end
|
||||||
|
@ -328,10 +330,11 @@ get_config_bin(NameVsn) ->
|
||||||
|
|
||||||
%% @doc Update plugin's config.
|
%% @doc Update plugin's config.
|
||||||
%% RPC call from Management API or CLI.
|
%% RPC call from Management API or CLI.
|
||||||
%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function.
|
%% The plugin config Json Map was valid by avro schema
|
||||||
put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
|
%% Or: if no and plugin config ALWAYS be valid before calling this function.
|
||||||
put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig);
|
put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) ->
|
||||||
put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) ->
|
put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
|
||||||
|
put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
|
||||||
HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
|
HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
|
||||||
ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
|
ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
|
||||||
%% TODO: callback in plugin's on_config_changed (config update by mgmt API)
|
%% TODO: callback in plugin's on_config_changed (config update by mgmt API)
|
||||||
|
@ -369,10 +372,30 @@ list() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Package utils
|
%% Package utils
|
||||||
|
|
||||||
-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
|
-spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
|
||||||
decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
|
{ok, map() | ?plugin_without_config_schema}
|
||||||
decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
|
| {error, any()}.
|
||||||
decode_plugin_config_map(NameVsn, AvroJsonBin) ->
|
decode_plugin_config_map(NameVsn, AvroJsonMap) ->
|
||||||
|
case with_plugin_avsc(NameVsn) of
|
||||||
|
true ->
|
||||||
|
case emqx_plugins_serde:lookup_serde(NameVsn) of
|
||||||
|
{error, not_found} ->
|
||||||
|
Reason = "plugin_config_schema_serde_not_found",
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
|
||||||
|
}),
|
||||||
|
{error, Reason};
|
||||||
|
{ok, _Serde} ->
|
||||||
|
do_decode_plugin_config_map(NameVsn, AvroJsonMap)
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
?SLOG(debug, #{name_vsn => NameVsn, plugin_with_avro_schema => false}),
|
||||||
|
{ok, ?plugin_without_config_schema}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
|
||||||
|
do_decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
|
||||||
|
do_decode_plugin_config_map(NameVsn, AvroJsonBin) ->
|
||||||
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
|
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
|
||||||
{ok, Config} -> {ok, Config};
|
{ok, Config} -> {ok, Config};
|
||||||
{error, ReasonMap} -> {error, ReasonMap}
|
{error, ReasonMap} -> {error, ReasonMap}
|
||||||
|
@ -1144,6 +1167,7 @@ do_create_config_dir(NameVsn) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec maybe_ensure_plugin_config(name_vsn()) -> ok.
|
||||||
maybe_ensure_plugin_config(NameVsn) ->
|
maybe_ensure_plugin_config(NameVsn) ->
|
||||||
maybe
|
maybe
|
||||||
true ?= with_plugin_avsc(NameVsn),
|
true ?= with_plugin_avsc(NameVsn),
|
||||||
|
@ -1152,10 +1176,13 @@ maybe_ensure_plugin_config(NameVsn) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec ensure_plugin_config(name_vsn()) -> ok.
|
||||||
ensure_plugin_config(NameVsn) ->
|
ensure_plugin_config(NameVsn) ->
|
||||||
%% fetch plugin hocon config from cluster
|
%% fetch plugin hocon config from cluster
|
||||||
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
Nodes = [N || N <- mria:running_nodes(), N /= node()],
|
||||||
ensure_plugin_config(NameVsn, Nodes).
|
ensure_plugin_config(NameVsn, Nodes).
|
||||||
|
|
||||||
|
-spec ensure_plugin_config(name_vsn(), list()) -> ok.
|
||||||
ensure_plugin_config(NameVsn, []) ->
|
ensure_plugin_config(NameVsn, []) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
msg => "default_plugin_config_used",
|
msg => "default_plugin_config_used",
|
||||||
|
@ -1167,7 +1194,9 @@ ensure_plugin_config(NameVsn, Nodes) ->
|
||||||
case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
|
case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
|
||||||
{ok, ConfigMap} when is_map(ConfigMap) ->
|
{ok, ConfigMap} when is_map(ConfigMap) ->
|
||||||
HoconBin = hocon_pp:do(ConfigMap, #{}),
|
HoconBin = hocon_pp:do(ConfigMap, #{}),
|
||||||
ok = file:write_file(plugin_config_file(NameVsn), HoconBin),
|
Path = plugin_config_file(NameVsn),
|
||||||
|
ok = filelib:ensure_dir(Path),
|
||||||
|
ok = file:write_file(Path, HoconBin),
|
||||||
ensure_config_map(NameVsn);
|
ensure_config_map(NameVsn);
|
||||||
_ ->
|
_ ->
|
||||||
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
|
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
|
||||||
|
@ -1176,6 +1205,7 @@ ensure_plugin_config(NameVsn, Nodes) ->
|
||||||
cp_default_config_file(NameVsn)
|
cp_default_config_file(NameVsn)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec cp_default_config_file(name_vsn()) -> ok.
|
||||||
cp_default_config_file(NameVsn) ->
|
cp_default_config_file(NameVsn) ->
|
||||||
%% always copy default hocon file into config dir when can not get config from other nodes
|
%% always copy default hocon file into config dir when can not get config from other nodes
|
||||||
Source = default_plugin_config_file(NameVsn),
|
Source = default_plugin_config_file(NameVsn),
|
||||||
|
@ -1184,9 +1214,10 @@ cp_default_config_file(NameVsn) ->
|
||||||
true ?= filelib:is_regular(Source),
|
true ?= filelib:is_regular(Source),
|
||||||
%% destination path not existed (not configured)
|
%% destination path not existed (not configured)
|
||||||
true ?= (not filelib:is_regular(Destination)),
|
true ?= (not filelib:is_regular(Destination)),
|
||||||
|
ok = filelib:ensure_dir(Destination),
|
||||||
case file:copy(Source, Destination) of
|
case file:copy(Source, Destination) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
ok;
|
ensure_config_map(NameVsn);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "failed_to_copy_plugin_default_hocon_config",
|
msg => "failed_to_copy_plugin_default_hocon_config",
|
||||||
|
@ -1200,14 +1231,15 @@ cp_default_config_file(NameVsn) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_config_map(NameVsn) ->
|
ensure_config_map(NameVsn) ->
|
||||||
with_plugin_avsc(NameVsn) andalso
|
|
||||||
do_ensure_config_map(NameVsn).
|
|
||||||
|
|
||||||
do_ensure_config_map(NameVsn) ->
|
|
||||||
case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
|
case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
|
||||||
{ok, ConfigJsonMap} ->
|
{ok, ConfigJsonMap} ->
|
||||||
{ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap),
|
case with_plugin_avsc(NameVsn) of
|
||||||
put_config(NameVsn, ConfigJsonMap, Config);
|
true ->
|
||||||
|
{ok, AvroValue} = decode_plugin_config_map(NameVsn, ConfigJsonMap),
|
||||||
|
put_config(NameVsn, ConfigJsonMap, AvroValue);
|
||||||
|
false ->
|
||||||
|
put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
|
||||||
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
|
?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
|
||||||
ok
|
ok
|
||||||
|
|
Loading…
Reference in New Issue