refactor: read avsc file when make serde

This commit is contained in:
JimMoen 2024-04-25 17:01:16 +08:00
parent 670ddae57c
commit e5bd747b32
No known key found for this signature in database
2 changed files with 32 additions and 43 deletions

View File

@ -1012,22 +1012,15 @@ maybe_post_op_after_install(NameVsn) ->
ok. ok.
maybe_load_config_schema(NameVsn) -> maybe_load_config_schema(NameVsn) ->
filelib:is_regular(avsc_file_path(NameVsn)) andalso AvscPath = avsc_file_path(NameVsn),
do_load_config_schema(NameVsn). filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath).
do_load_config_schema(NameVsn) -> do_load_config_schema(NameVsn, AvscPath) ->
case read_plugin_avsc(NameVsn, #{read_mode => ?RAW_BIN}) of case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
{ok, AvscBin} ->
case emqx_plugins_serde:add_schema(NameVsn, AvscBin) of
ok -> ok; ok -> ok;
{error, already_exists} -> ok; {error, already_exists} -> ok;
{error, _Reason} -> ok {error, _Reason} -> ok
end;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_read_plugin_avsc", reason => Reason, name_vsn => NameVsn
}),
ok
end. end.
maybe_create_config_dir(NameVsn) -> maybe_create_config_dir(NameVsn) ->

View File

@ -59,20 +59,20 @@ lookup_serde(SchemaName) ->
end. end.
-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. -spec add_schema(schema_name(), avsc()) -> ok | {error, term()}.
add_schema(Name, Avsc) -> add_schema(NameVsn, Path) ->
case lookup_serde(Name) of case lookup_serde(NameVsn) of
{ok, _Serde} -> {ok, _Serde} ->
?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => Name}), ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}),
{error, already_exists}; {error, already_exists};
{error, not_found} -> {error, not_found} ->
case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}, infinity) of case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of
ok -> ok ->
?SLOG(debug, #{msg => "plugin_schema_added", plugin => Name}), ?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}),
ok; ok;
{error, Reason} = E -> {error, Reason} = E ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "plugin_schema_add_failed", msg => "plugin_schema_add_failed",
plugin => Name, plugin => NameVsn,
reason => emqx_utils:readable_error_msg(Reason) reason => emqx_utils:readable_error_msg(Reason)
}), }),
E E
@ -123,15 +123,15 @@ init(_) ->
public, ordered_set, {keypos, #plugin_schema_serde.name} public, ordered_set, {keypos, #plugin_schema_serde.name}
]), ]),
State = #{}, State = #{},
SchemasMap = read_plugin_avsc(), AvscPaths = get_plugin_avscs(),
{ok, State, {continue, {build_serdes, SchemasMap}}}. {ok, State, {continue, {build_serdes, AvscPaths}}}.
handle_continue({build_serdes, SchemasMap}, State) -> handle_continue({build_serdes, AvscPaths}, State) ->
_ = build_serdes(SchemasMap), _ = build_serdes(AvscPaths),
{noreply, State}. {noreply, State}.
handle_call({build_serdes, NameVsn, Avsc}, _From, State) -> handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
BuildRes = do_build_serde(NameVsn, Avsc), BuildRes = do_build_serde({NameVsn, AvscPath}),
{reply, BuildRes, State}; {reply, BuildRes, State};
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, {error, unknown_call}, State}. {reply, {error, unknown_call}, State}.
@ -149,29 +149,24 @@ terminate(_Reason, _State) ->
%% Internal fns %% Internal fns
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
read_plugin_avsc() -> -spec get_plugin_avscs() -> [{string(), string()}].
get_plugin_avscs() ->
Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]),
lists:foldl( lists:foldl(
fun(AvscPath, AccIn) -> fun(AvscPath, AccIn) ->
case read_avsc_file(AvscPath) of
{ok, Avsc} ->
[_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
AccIn#{to_bin(NameVsn) => Avsc}; [{NameVsn, AvscPath} | AccIn]
{error, Reason} ->
?SLOG(warning, Reason),
AccIn
end
end, end,
_Acc0 = #{}, _Acc0 = [],
filelib:wildcard(Pattern) filelib:wildcard(Pattern)
). ).
build_serdes(Schemas) -> build_serdes(AvscPaths) ->
maps:foreach(fun do_build_serde/2, Schemas). ok = lists:foreach(fun do_build_serde/1, AvscPaths).
do_build_serde(NameVsn, Avsc) -> do_build_serde({NameVsn, AvscPath}) ->
try try
Serde = make_serde(NameVsn, Avsc), Serde = make_serde(NameVsn, AvscPath),
true = ets:insert(?PLUGIN_SERDE_TAB, Serde), true = ets:insert(?PLUGIN_SERDE_TAB, Serde),
ok ok
catch catch
@ -189,11 +184,12 @@ do_build_serde(NameVsn, Avsc) ->
{error, Error} {error, Error}
end. end.
make_serde(NameVsn, Avsc) -> make_serde(NameVsn, AvscPath) ->
{ok, AvscBin} = read_avsc_file(AvscPath),
Store0 = avro_schema_store:new([map]), Store0 = avro_schema_store:new([map]),
%% import the schema into the map store with an assigned name %% import the schema into the map store with an assigned name
%% if it's a named schema (e.g. struct), then Name is added as alias %% if it's a named schema (e.g. struct), then Name is added as alias
Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0), Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0),
#plugin_schema_serde{ #plugin_schema_serde{
name = NameVsn, name = NameVsn,
eval_context = Store eval_context = Store