diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 7a73cc72b..c4ef79e17 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -1012,22 +1012,15 @@ maybe_post_op_after_install(NameVsn) -> ok. maybe_load_config_schema(NameVsn) -> - filelib:is_regular(avsc_file_path(NameVsn)) andalso - do_load_config_schema(NameVsn). + AvscPath = avsc_file_path(NameVsn), + filelib:is_regular(AvscPath) andalso + do_load_config_schema(NameVsn, AvscPath). -do_load_config_schema(NameVsn) -> - case read_plugin_avsc(NameVsn, #{read_mode => ?RAW_BIN}) of - {ok, AvscBin} -> - case emqx_plugins_serde:add_schema(NameVsn, AvscBin) of - ok -> ok; - {error, already_exists} -> ok; - {error, _Reason} -> ok - end; - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_read_plugin_avsc", reason => Reason, name_vsn => NameVsn - }), - ok +do_load_config_schema(NameVsn, AvscPath) -> + case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of + ok -> ok; + {error, already_exists} -> ok; + {error, _Reason} -> ok end. maybe_create_config_dir(NameVsn) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_serde.erl b/apps/emqx_plugins/src/emqx_plugins_serde.erl index b936020a6..00fd04b63 100644 --- a/apps/emqx_plugins/src/emqx_plugins_serde.erl +++ b/apps/emqx_plugins/src/emqx_plugins_serde.erl @@ -59,20 +59,20 @@ lookup_serde(SchemaName) -> end. -spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. -add_schema(Name, Avsc) -> - case lookup_serde(Name) of +add_schema(NameVsn, Path) -> + case lookup_serde(NameVsn) of {ok, _Serde} -> - ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => Name}), + ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}), {error, already_exists}; {error, not_found} -> - case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}, infinity) of + case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of ok -> - ?SLOG(debug, #{msg => "plugin_schema_added", plugin => Name}), + ?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}), ok; {error, Reason} = E -> ?SLOG(error, #{ msg => "plugin_schema_add_failed", - plugin => Name, + plugin => NameVsn, reason => emqx_utils:readable_error_msg(Reason) }), E @@ -123,15 +123,15 @@ init(_) -> public, ordered_set, {keypos, #plugin_schema_serde.name} ]), State = #{}, - SchemasMap = read_plugin_avsc(), - {ok, State, {continue, {build_serdes, SchemasMap}}}. + AvscPaths = get_plugin_avscs(), + {ok, State, {continue, {build_serdes, AvscPaths}}}. -handle_continue({build_serdes, SchemasMap}, State) -> - _ = build_serdes(SchemasMap), +handle_continue({build_serdes, AvscPaths}, State) -> + _ = build_serdes(AvscPaths), {noreply, State}. -handle_call({build_serdes, NameVsn, Avsc}, _From, State) -> - BuildRes = do_build_serde(NameVsn, Avsc), +handle_call({build_serdes, NameVsn, AvscPath}, _From, State) -> + BuildRes = do_build_serde({NameVsn, AvscPath}), {reply, BuildRes, State}; handle_call(_Call, _From, State) -> {reply, {error, unknown_call}, State}. @@ -149,29 +149,24 @@ terminate(_Reason, _State) -> %% Internal fns %%------------------------------------------------------------------------------------------------- -read_plugin_avsc() -> +-spec get_plugin_avscs() -> [{string(), string()}]. +get_plugin_avscs() -> Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]), lists:foldl( fun(AvscPath, AccIn) -> - case read_avsc_file(AvscPath) of - {ok, Avsc} -> - [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), - AccIn#{to_bin(NameVsn) => Avsc}; - {error, Reason} -> - ?SLOG(warning, Reason), - AccIn - end + [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), + [{NameVsn, AvscPath} | AccIn] end, - _Acc0 = #{}, + _Acc0 = [], filelib:wildcard(Pattern) ). -build_serdes(Schemas) -> - maps:foreach(fun do_build_serde/2, Schemas). +build_serdes(AvscPaths) -> + ok = lists:foreach(fun do_build_serde/1, AvscPaths). -do_build_serde(NameVsn, Avsc) -> +do_build_serde({NameVsn, AvscPath}) -> try - Serde = make_serde(NameVsn, Avsc), + Serde = make_serde(NameVsn, AvscPath), true = ets:insert(?PLUGIN_SERDE_TAB, Serde), ok catch @@ -189,11 +184,12 @@ do_build_serde(NameVsn, Avsc) -> {error, Error} end. -make_serde(NameVsn, Avsc) -> +make_serde(NameVsn, AvscPath) -> + {ok, AvscBin} = read_avsc_file(AvscPath), Store0 = avro_schema_store:new([map]), %% import the schema into the map store with an assigned name %% if it's a named schema (e.g. struct), then Name is added as alias - Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0), + Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0), #plugin_schema_serde{ name = NameVsn, eval_context = Store