fix: call json encoder/decoder for plugin config

This commit is contained in:
JimMoen 2024-04-25 13:40:55 +08:00
parent c180b6a417
commit d2ecccc2ff
No known key found for this signature in database
4 changed files with 35 additions and 29 deletions

View File

@ -176,7 +176,7 @@ schema("/plugins/:name/config") ->
tags => ?TAGS, tags => ?TAGS,
parameters => [hoconsc:ref(name)], parameters => [hoconsc:ref(name)],
responses => #{ responses => #{
%% binary avro encoded config %% avro data, json encoded
200 => hoconsc:mk(binary()), 200 => hoconsc:mk(binary()),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
} }
@ -190,14 +190,10 @@ schema("/plugins/:name/config") ->
parameters => [hoconsc:ref(name)], parameters => [hoconsc:ref(name)],
'requestBody' => #{ 'requestBody' => #{
content => #{ content => #{
'multipart/form-data' => #{ 'application/json' => #{
schema => #{ schema => #{
type => object, type => object
properties => #{ }
?CONTENT_CONFIG => #{type => string, format => binary}
}
},
encoding => #{?CONTENT_CONFIG => #{'contentType' => 'application/gzip'}}
} }
} }
}, },
@ -499,12 +495,14 @@ plugin_config(get, #{bindings := #{name := Name}}) ->
message => <<"Failed to get plugin config">> message => <<"Failed to get plugin config">>
}} }}
end; end;
plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawAvro}}) -> plugin_config(put, #{bindings := #{name := Name}, body := AvroJsonMap}) ->
case emqx_plugins:decode_plugin_avro_config(Name, RawAvro) of AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
{ok, Config} -> case emqx_plugins:decode_plugin_avro_config(Name, AvroJsonBin) of
{ok, AvroValueConfig} ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
%% cluster call with config in map (binary key-value)
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
Nodes, Name, RawAvro, Config Nodes, Name, AvroJsonMap, AvroValueConfig
), ),
{204}; {204};
{error, Reason} -> {error, Reason} ->
@ -595,8 +593,9 @@ ensure_action(Name, restart) ->
ok. ok.
%% for RPC plugin avro encoded config update %% for RPC plugin avro encoded config update
do_update_plugin_config(Name, Avro, PluginConfig) -> do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) ->
emqx_plugins:put_plugin_config(Name, Avro, PluginConfig). %% TOOD: maybe use `PluginConfigMap` to validate config
emqx_plugins:put_plugin_config(Name, AvroJsonMap, PluginConfigMap).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -59,7 +59,11 @@ ensure_action(Name, Action) ->
map() map()
) -> ) ->
emqx_rpc:multicall_result(). emqx_rpc:multicall_result().
update_plugin_config(Nodes, Name, RawAvro, PluginConfig) -> update_plugin_config(Nodes, Name, AvroJsonMap, PluginConfig) ->
rpc:multicall( rpc:multicall(
Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, [Name, RawAvro, PluginConfig], 10000 Nodes,
emqx_mgmt_api_plugins,
do_update_plugin_config,
[Name, AvroJsonMap, PluginConfig],
10000
). ).

View File

@ -51,7 +51,7 @@
ensure_stopped/1, ensure_stopped/1,
get_plugin_config/1, get_plugin_config/1,
get_plugin_config/2, get_plugin_config/2,
put_plugin_config/3, put_plugin_config/4,
restart/1, restart/1,
list/0 list/0
]). ]).
@ -256,7 +256,7 @@ get_plugin_config(NameVsn) ->
| {error, term()}. | {error, term()}.
get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) -> get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
case read_plugin_avro(NameVsn) of case read_plugin_avro(NameVsn) of
{ok, _AvroBin} = Res -> Res; {ok, _AvroJson} = Res -> Res;
{error, _Reason} = Err -> Err {error, _Reason} = Err -> Err
end; end;
get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) -> get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) ->
@ -265,9 +265,10 @@ get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) ->
%% @doc Update plugin's config. %% @doc Update plugin's config.
%% RPC call from Management API or CLI. %% RPC call from Management API or CLI.
%% the avro binary and plugin config ALWAYS be valid before calling this function. %% the avro binary and plugin config ALWAYS be valid before calling this function.
put_plugin_config(NameVsn, RawAvro, PluginConfig) -> put_plugin_config(NameVsn, AvroJsonMap, DecodedPluginConfig) ->
ok = write_avro_bin(NameVsn, RawAvro), AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), PluginConfig), ok = write_avro_bin(NameVsn, AvroJsonBin),
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
ok. ok.
%% @doc Stop and then start the plugin. %% @doc Stop and then start the plugin.
@ -300,9 +301,11 @@ list() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Package utils %% Package utils
-spec decode_plugin_avro_config(name_vsn(), binary()) -> {ok, map()} | {error, any()}. -spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
decode_plugin_avro_config(NameVsn, RawAvro) -> decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
case emqx_plugins_serde:decode(NameVsn, RawAvro) of decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
{ok, Config} -> {ok, Config}; {ok, Config} -> {ok, Config};
{error, ReasonMap} -> {error, ReasonMap} {error, ReasonMap} -> {error, ReasonMap}
end. end.

View File

@ -102,14 +102,14 @@ delete_schema(NameVsn) ->
-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}. -spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
decode(SerdeName, RawData) -> decode(SerdeName, RawData) ->
with_serde( with_serde(
"decode_avro_binary", "decode_avro_json",
eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData]) eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData])
). ).
-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}. -spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
encode(SerdeName, Data) -> encode(SerdeName, Data) ->
with_serde( with_serde(
"encode_avro_data", "encode_avro_json",
eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data]) eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data])
). ).
@ -252,10 +252,10 @@ eval_serde_fun(Op, ErrMsg, SerdeName, Args) ->
end. end.
eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}, {encoding, avro_json}]),
{ok, avro_binary_decoder:decode(Data, Name, Store, Opts)}; {ok, avro_json_decoder:decode_value(Data, Name, Store, Opts)};
eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) -> eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
{ok, avro_binary_encoder:encode(Store, Name, Data)}; {ok, avro_json_encoder:encode(Store, Name, Data)};
eval_serde(_, _, _) -> eval_serde(_, _, _) ->
throw(#{error_msg => "unexpected_plugin_avro_op"}). throw(#{error_msg => "unexpected_plugin_avro_op"}).