From 03519d7e61817acbc279e08194858d32fff80831 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 3 Jun 2021 10:55:37 +0800 Subject: [PATCH] fix(emqx_resource): HTTP APIs for emqx_resource not working --- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/rebar.config | 2 +- apps/emqx_resource/src/emqx_resource.app.src | 3 +- apps/emqx_resource/src/emqx_resource.erl | 54 ++++++++++--- apps/emqx_resource/src/emqx_resource_api.erl | 17 ++-- .../src/emqx_resource_instance.erl | 78 ++++++++++++------- .../src/emqx_resource_transform.erl | 16 ++-- 7 files changed, 114 insertions(+), 58 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 1f75b453e..123854bc9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -type resource_type() :: module(). -type instance_id() :: binary(). --type resource_config() :: jsx:json_term(). +-type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_data() :: #{ diff --git a/apps/emqx_resource/rebar.config b/apps/emqx_resource/rebar.config index 4e88043de..66271ee56 100644 --- a/apps/emqx_resource/rebar.config +++ b/apps/emqx_resource/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [ debug_info , nowarn_unused_import - %, {d, 'RESOURCE_DEBUG'} + , {d, 'RESOURCE_DEBUG'} ]}. {erl_first_files, ["src/emqx_resource_transform.erl"]}. diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index af9f48cc6..13330b061 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -7,7 +7,8 @@ [kernel, stdlib, gproc, - hocon + hocon, + jsx ]}, {env,[]}, {modules, []}, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b9107d328..c432e683e 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -65,13 +65,17 @@ , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance , call_config_merge/4 %% merge the config when updating + , call_jsonify/2 + , call_api_reply_format/2 ]). -export([ list_instances/0 %% list all the instances, id only. , list_instances_verbose/0 %% list all the instances , get_instance/1 %% return the data of the instance , get_instance_by_type/1 %% return all the instances of the same resource type - , load_instances/1 %% load instances from config files + , load_instances_from_dir/1 %% load instances from a directory + , load_instance_from_file/1 %% load an instance from a config file + , load_instance_from_config/1 %% load an instance from a map or json-string config % , dependents/1 % , inc_counter/2 %% increment the counter of the instance % , inc_counter/3 %% increment the counter by a given integer @@ -81,14 +85,17 @@ -optional_callbacks([ on_query/4 , on_health_check/2 - , on_api_reply_format/1 , on_config_merge/3 + , on_jsonify/1 + , on_api_reply_format/1 ]). --callback on_api_reply_format(resource_data()) -> map(). +-callback on_api_reply_format(resource_data()) -> jsx:json_term(). -callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). +-callback on_jsonify(resource_config()) -> jsx:json_term(). + %% when calling emqx_resource:start/1 -callback on_start(instance_id(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -208,9 +215,17 @@ list_instances_verbose() -> get_instance_by_type(ResourceType) -> emqx_resource_instance:lookup_by_type(ResourceType). --spec load_instances(Dir :: string()) -> ok. -load_instances(Dir) -> - emqx_resource_instance:load(Dir). +-spec load_instances_from_dir(Dir :: string()) -> ok. +load_instances_from_dir(Dir) -> + emqx_resource_instance:load_dir(Dir). + +-spec load_instance_from_file(File :: string()) -> ok. +load_instance_from_file(File) -> + emqx_resource_instance:load_file(File). + +-spec load_instance_from_config(binary() | map()) -> ok. +load_instance_from_config(Config) -> + emqx_resource_instance:load_config(Config). -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -229,7 +244,28 @@ call_stop(InstId, Mod, ResourceState) -> -spec call_config_merge(module(), resource_config(), resource_config(), term()) -> resource_config(). call_config_merge(Mod, OldConfig, NewConfig, Params) -> - ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)). + case erlang:function_exported(Mod, on_jsonify, 1) of + true -> + ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)); + false when is_map(OldConfig), is_map(NewConfig) -> + maps:merge(OldConfig, NewConfig); + false -> + NewConfig + end. + +-spec call_jsonify(module(), resource_config()) -> jsx:json_term(). +call_jsonify(Mod, Config) -> + case erlang:function_exported(Mod, on_jsonify, 1) of + false -> Config; + true -> ?SAFE_CALL(Mod:on_jsonify(Config)) + end. + +-spec call_api_reply_format(module(), resource_data()) -> jsx:json_term(). +call_api_reply_format(Mod, Data) -> + case erlang:function_exported(Mod, on_api_reply_format, 1) of + false -> emqx_resource_api:default_api_reply_format(Data); + true -> ?SAFE_CALL(Mod:on_api_reply_format(Data)) + end. -spec parse_config(resource_type(), binary() | term()) -> {ok, resource_config()} | {error, term()}. @@ -240,7 +276,7 @@ parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> Error -> Error end; parse_config(ResourceType, RawConfigTerm) -> - parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})). + parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). -spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. do_parse_config(ResourceType, MapConfig) -> @@ -261,7 +297,7 @@ resource_type_from_str(ResourceType) -> false -> {error, {invalid_resource, Mod}} end catch error:badarg -> - {error, {not_found, ResourceType}} + {error, {resource_not_found, ResourceType}} end. call_instance(InstId, Query) -> diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl index cc32d8a11..1e19cdede 100644 --- a/apps/emqx_resource/src/emqx_resource_api.erl +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -20,6 +20,9 @@ , put/3 , delete/3 ]). + +-export([default_api_reply_format/1]). + get_all(Mod, _Binding, _Params) -> {200, #{code => 0, data => [format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}. @@ -34,7 +37,7 @@ get(Mod, #{id := Id}, _Params) -> put(Mod, #{id := Id}, Params) -> ConfigParams = proplists:get_value(<<"config">>, Params), - ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params), + ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}), case emqx_resource:resource_type_from_str(ResourceTypeStr) of {ok, ResourceType} -> do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); @@ -63,15 +66,11 @@ delete(_Mod, #{id := Id}, _Params) -> end. format_data(Mod, Data) -> - case erlang:function_exported(Mod, on_api_reply_format, 1) of - false -> - default_api_reply_format(Data); - true -> - Mod:on_api_reply_format(Data) - end. + emqx_resource:call_api_reply_format(Mod, Data). -default_api_reply_format(#{id := Id, status := Status, config := Config}) -> - #{node => node(), id => Id, status => Status, config => Config}. +default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) -> + #{node => node(), id => Id, status => Status, resource_type => Mod, + config => emqx_resource:call_jsonify(Mod, Config)}. stringnify(Bin) when is_binary(Bin) -> Bin; stringnify(Str) when is_list(Str) -> list_to_binary(Str); diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index ca5e4829e..dbbf052a1 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -23,10 +23,13 @@ -export([start_link/2]). %% load resource instances from *.conf files --export([ load/1 +-export([ load_dir/1 + , load_file/1 + , load_config/1 , lookup/1 , list_all/0 , lookup_by_type/1 + , create_local/3 ]). -export([ hash_call/2 @@ -82,8 +85,8 @@ lookup_by_type(ResourceType) -> [Data || #{mod := Mod} = Data <- list_all() , Mod =:= ResourceType]. --spec load(Dir :: string()) -> ok. -load(Dir) -> +-spec load_dir(Dir :: string()) -> ok. +load_dir(Dir) -> lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). load_file(File) -> @@ -91,40 +94,51 @@ load_file(File) -> {error, Reason} -> logger:error("load resource from ~p failed: ~p", [File, Reason]); RawConfig -> - case hocon:binary(RawConfig, #{format => map}) of - {ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, - <<"config">> := MapConfig}} -> - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> - parse_and_load_config(Id, ResourceType, MapConfig); - {error, Reason} -> - logger:error("no such resource type: ~s, ~p", - [ResourceTypeStr, Reason]) - end; + case load_config(RawConfig) of + {ok, Data} -> + logger:debug("loaded resource instance from file: ~p, data: ~p", + [File, Data]); {error, Reason} -> logger:error("load resource from ~p failed: ~p", [File, Reason]) end end. -parse_and_load_config(InstId, ResourceType, MapConfig) -> - case emqx_resource:parse_config(ResourceType, MapConfig) of - {error, Reason} -> - logger:error("parse config for resource ~p of type ~p failed: ~p", - [InstId, ResourceType, Reason]); - {ok, InstConf} -> - create_instance_local(InstId, ResourceType, InstConf) +-spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}. +load_config(RawConfig) when is_binary(RawConfig) -> + case hocon:binary(RawConfig, #{format => map}) of + {ok, ConfigTerm} -> load_config(ConfigTerm); + Error -> Error + end; + +load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) -> + MapConfig = maps:get(<<"config">>, Config, #{}), + case emqx_resource:resource_type_from_str(ResourceTypeStr) of + {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); + Error -> Error end. -create_instance_local(InstId, ResourceType, InstConf) -> - case do_create(InstId, ResourceType, InstConf) of - {ok, Data} -> - logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p", - [ResourceType, InstId, InstConf, Data]); - {error, Reason} -> - logger:error("create ~p resource instance: ~p failed: ~p, config: ~p", - [ResourceType, InstId, Reason, InstConf]) +parse_and_load_config(InstId, ResourceType, MapConfig) -> + case emqx_resource:parse_config(ResourceType, MapConfig) of + {ok, InstConf} -> create_local(InstId, ResourceType, InstConf); + Error -> Error end. +create_local(InstId, ResourceType, InstConf) -> + case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of + {ok, Data} -> {ok, Data}; + Error -> Error + end. + +save_config_to_disk(InstId, ResourceType, Config) -> + %% TODO: send an event to the config handler, and the hander (single process) + %% will dump configs for all instances (from an ETS table) to a file. + file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]), + jsx:encode(#{id => InstId, resource_type => ResourceType, + config => emqx_resource:call_jsonify(ResourceType, Config)})). + +emqx_data_dir() -> + "data". + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -205,7 +219,13 @@ do_create(InstId, ResourceType, Config) -> #{mod => ResourceType, config => Config, state => ResourceState, status => stopped}}), _ = do_health_check(InstId), - {ok, force_lookup(InstId)}; + case save_config_to_disk(InstId, ResourceType, Config) of + ok -> {ok, force_lookup(InstId)}; + {error, Reason} -> + logger:error("save config for ~p resource ~p to disk failed: ~p", + [ResourceType, InstId, Reason]), + {error, Reason} + end; {error, Reason} -> logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]), {error, Reason} diff --git a/apps/emqx_resource/src/emqx_resource_transform.erl b/apps/emqx_resource/src/emqx_resource_transform.erl index 23e14013c..cd6c7e4ae 100644 --- a/apps/emqx_resource/src/emqx_resource_transform.erl +++ b/apps/emqx_resource/src/emqx_resource_transform.erl @@ -57,7 +57,8 @@ forms(_, []) -> []. form(Mod, Form) -> case Form of ?Q("-emqx_resource_api_path('@Path').") -> - {fix_spec_attrs() ++ fix_api_attrs(erl_syntax:concrete(Path)) ++ fix_api_exports(), + {fix_spec_attrs() ++ fix_api_attrs(Mod, erl_syntax:concrete(Path)) + ++ fix_api_exports(), [], fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)}; _ -> @@ -75,18 +76,17 @@ fix_spec_funcs(_Mod) -> , ?Q("structs() -> [\"config\"].") ]. -fix_api_attrs(Path0) -> - BaseName = filename:basename(Path0), - Path = "/" ++ BaseName, +fix_api_attrs(Mod, Path) -> + BaseName = atom_to_list(Mod), [erl_syntax:revert( erl_syntax:attribute(?Q("rest_api"), [ erl_syntax:abstract(#{ - name => list_to_atom(Name ++ "_log_tracers"), + name => list_to_atom(Act ++ "_" ++ BaseName), method => Method, path => mk_path(Path, WithId), func => Func, - descr => Name ++ " the " ++ BaseName})])) - || {Name, Method, WithId, Func} <- [ + descr => Act ++ " the " ++ BaseName})])) + || {Act, Method, WithId, Func} <- [ {"list", 'GET', noid, api_get_all}, {"get", 'GET', id, api_get}, {"update", 'PUT', id, api_put}, @@ -110,5 +110,5 @@ fix_api_funcs(Mod) -> emqx_resource_api:delete('@Mod@', Binding, Params).")) ]. -mk_path(Path, id) -> Path ++ "/:bin:id"; +mk_path(Path, id) -> string:trim(Path, trailing, "/") ++ "/:bin:id"; mk_path(Path, noid) -> Path.