fix(emqx_resource): HTTP APIs for emqx_resource not working
This commit is contained in:
parent
624c4ecedc
commit
03519d7e61
|
@ -15,7 +15,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-type resource_type() :: module().
|
-type resource_type() :: module().
|
||||||
-type instance_id() :: binary().
|
-type instance_id() :: binary().
|
||||||
-type resource_config() :: jsx:json_term().
|
-type resource_config() :: term().
|
||||||
-type resource_spec() :: map().
|
-type resource_spec() :: map().
|
||||||
-type resource_state() :: term().
|
-type resource_state() :: term().
|
||||||
-type resource_data() :: #{
|
-type resource_data() :: #{
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{erl_opts, [ debug_info
|
{erl_opts, [ debug_info
|
||||||
, nowarn_unused_import
|
, nowarn_unused_import
|
||||||
%, {d, 'RESOURCE_DEBUG'}
|
, {d, 'RESOURCE_DEBUG'}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{erl_first_files, ["src/emqx_resource_transform.erl"]}.
|
{erl_first_files, ["src/emqx_resource_transform.erl"]}.
|
||||||
|
|
|
@ -7,7 +7,8 @@
|
||||||
[kernel,
|
[kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
gproc,
|
gproc,
|
||||||
hocon
|
hocon,
|
||||||
|
jsx
|
||||||
]},
|
]},
|
||||||
{env,[]},
|
{env,[]},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
|
@ -65,13 +65,17 @@
|
||||||
, call_health_check/3 %% verify if the resource is working normally
|
, call_health_check/3 %% verify if the resource is working normally
|
||||||
, call_stop/3 %% stop the instance
|
, call_stop/3 %% stop the instance
|
||||||
, call_config_merge/4 %% merge the config when updating
|
, call_config_merge/4 %% merge the config when updating
|
||||||
|
, call_jsonify/2
|
||||||
|
, call_api_reply_format/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ list_instances/0 %% list all the instances, id only.
|
-export([ list_instances/0 %% list all the instances, id only.
|
||||||
, list_instances_verbose/0 %% list all the instances
|
, list_instances_verbose/0 %% list all the instances
|
||||||
, get_instance/1 %% return the data of the instance
|
, get_instance/1 %% return the data of the instance
|
||||||
, get_instance_by_type/1 %% return all the instances of the same resource type
|
, get_instance_by_type/1 %% return all the instances of the same resource type
|
||||||
, load_instances/1 %% load instances from config files
|
, load_instances_from_dir/1 %% load instances from a directory
|
||||||
|
, load_instance_from_file/1 %% load an instance from a config file
|
||||||
|
, load_instance_from_config/1 %% load an instance from a map or json-string config
|
||||||
% , dependents/1
|
% , dependents/1
|
||||||
% , inc_counter/2 %% increment the counter of the instance
|
% , inc_counter/2 %% increment the counter of the instance
|
||||||
% , inc_counter/3 %% increment the counter by a given integer
|
% , inc_counter/3 %% increment the counter by a given integer
|
||||||
|
@ -81,14 +85,17 @@
|
||||||
|
|
||||||
-optional_callbacks([ on_query/4
|
-optional_callbacks([ on_query/4
|
||||||
, on_health_check/2
|
, on_health_check/2
|
||||||
, on_api_reply_format/1
|
|
||||||
, on_config_merge/3
|
, on_config_merge/3
|
||||||
|
, on_jsonify/1
|
||||||
|
, on_api_reply_format/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback on_api_reply_format(resource_data()) -> map().
|
-callback on_api_reply_format(resource_data()) -> jsx:json_term().
|
||||||
|
|
||||||
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
||||||
|
|
||||||
|
-callback on_jsonify(resource_config()) -> jsx:json_term().
|
||||||
|
|
||||||
%% when calling emqx_resource:start/1
|
%% when calling emqx_resource:start/1
|
||||||
-callback on_start(instance_id(), resource_config()) ->
|
-callback on_start(instance_id(), resource_config()) ->
|
||||||
{ok, resource_state()} | {error, Reason :: term()}.
|
{ok, resource_state()} | {error, Reason :: term()}.
|
||||||
|
@ -208,9 +215,17 @@ list_instances_verbose() ->
|
||||||
get_instance_by_type(ResourceType) ->
|
get_instance_by_type(ResourceType) ->
|
||||||
emqx_resource_instance:lookup_by_type(ResourceType).
|
emqx_resource_instance:lookup_by_type(ResourceType).
|
||||||
|
|
||||||
-spec load_instances(Dir :: string()) -> ok.
|
-spec load_instances_from_dir(Dir :: string()) -> ok.
|
||||||
load_instances(Dir) ->
|
load_instances_from_dir(Dir) ->
|
||||||
emqx_resource_instance:load(Dir).
|
emqx_resource_instance:load_dir(Dir).
|
||||||
|
|
||||||
|
-spec load_instance_from_file(File :: string()) -> ok.
|
||||||
|
load_instance_from_file(File) ->
|
||||||
|
emqx_resource_instance:load_file(File).
|
||||||
|
|
||||||
|
-spec load_instance_from_config(binary() | map()) -> ok.
|
||||||
|
load_instance_from_config(Config) ->
|
||||||
|
emqx_resource_instance:load_config(Config).
|
||||||
|
|
||||||
-spec call_start(instance_id(), module(), resource_config()) ->
|
-spec call_start(instance_id(), module(), resource_config()) ->
|
||||||
{ok, resource_state()} | {error, Reason :: term()}.
|
{ok, resource_state()} | {error, Reason :: term()}.
|
||||||
|
@ -229,7 +244,28 @@ call_stop(InstId, Mod, ResourceState) ->
|
||||||
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
||||||
resource_config().
|
resource_config().
|
||||||
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
||||||
?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)).
|
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||||
|
true ->
|
||||||
|
?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params));
|
||||||
|
false when is_map(OldConfig), is_map(NewConfig) ->
|
||||||
|
maps:merge(OldConfig, NewConfig);
|
||||||
|
false ->
|
||||||
|
NewConfig
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
|
||||||
|
call_jsonify(Mod, Config) ->
|
||||||
|
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||||
|
false -> Config;
|
||||||
|
true -> ?SAFE_CALL(Mod:on_jsonify(Config))
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec call_api_reply_format(module(), resource_data()) -> jsx:json_term().
|
||||||
|
call_api_reply_format(Mod, Data) ->
|
||||||
|
case erlang:function_exported(Mod, on_api_reply_format, 1) of
|
||||||
|
false -> emqx_resource_api:default_api_reply_format(Data);
|
||||||
|
true -> ?SAFE_CALL(Mod:on_api_reply_format(Data))
|
||||||
|
end.
|
||||||
|
|
||||||
-spec parse_config(resource_type(), binary() | term()) ->
|
-spec parse_config(resource_type(), binary() | term()) ->
|
||||||
{ok, resource_config()} | {error, term()}.
|
{ok, resource_config()} | {error, term()}.
|
||||||
|
@ -240,7 +276,7 @@ parse_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end;
|
end;
|
||||||
parse_config(ResourceType, RawConfigTerm) ->
|
parse_config(ResourceType, RawConfigTerm) ->
|
||||||
parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})).
|
parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
|
||||||
|
|
||||||
-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
|
-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
|
||||||
do_parse_config(ResourceType, MapConfig) ->
|
do_parse_config(ResourceType, MapConfig) ->
|
||||||
|
@ -261,7 +297,7 @@ resource_type_from_str(ResourceType) ->
|
||||||
false -> {error, {invalid_resource, Mod}}
|
false -> {error, {invalid_resource, Mod}}
|
||||||
end
|
end
|
||||||
catch error:badarg ->
|
catch error:badarg ->
|
||||||
{error, {not_found, ResourceType}}
|
{error, {resource_not_found, ResourceType}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
call_instance(InstId, Query) ->
|
call_instance(InstId, Query) ->
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
, put/3
|
, put/3
|
||||||
, delete/3
|
, delete/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([default_api_reply_format/1]).
|
||||||
|
|
||||||
get_all(Mod, _Binding, _Params) ->
|
get_all(Mod, _Binding, _Params) ->
|
||||||
{200, #{code => 0, data =>
|
{200, #{code => 0, data =>
|
||||||
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
|
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
|
||||||
|
@ -34,7 +37,7 @@ get(Mod, #{id := Id}, _Params) ->
|
||||||
|
|
||||||
put(Mod, #{id := Id}, Params) ->
|
put(Mod, #{id := Id}, Params) ->
|
||||||
ConfigParams = proplists:get_value(<<"config">>, Params),
|
ConfigParams = proplists:get_value(<<"config">>, Params),
|
||||||
ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params),
|
ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}),
|
||||||
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
|
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
|
||||||
{ok, ResourceType} ->
|
{ok, ResourceType} ->
|
||||||
do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);
|
do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);
|
||||||
|
@ -63,15 +66,11 @@ delete(_Mod, #{id := Id}, _Params) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format_data(Mod, Data) ->
|
format_data(Mod, Data) ->
|
||||||
case erlang:function_exported(Mod, on_api_reply_format, 1) of
|
emqx_resource:call_api_reply_format(Mod, Data).
|
||||||
false ->
|
|
||||||
default_api_reply_format(Data);
|
|
||||||
true ->
|
|
||||||
Mod:on_api_reply_format(Data)
|
|
||||||
end.
|
|
||||||
|
|
||||||
default_api_reply_format(#{id := Id, status := Status, config := Config}) ->
|
default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) ->
|
||||||
#{node => node(), id => Id, status => Status, config => Config}.
|
#{node => node(), id => Id, status => Status, resource_type => Mod,
|
||||||
|
config => emqx_resource:call_jsonify(Mod, Config)}.
|
||||||
|
|
||||||
stringnify(Bin) when is_binary(Bin) -> Bin;
|
stringnify(Bin) when is_binary(Bin) -> Bin;
|
||||||
stringnify(Str) when is_list(Str) -> list_to_binary(Str);
|
stringnify(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
|
|
|
@ -23,10 +23,13 @@
|
||||||
-export([start_link/2]).
|
-export([start_link/2]).
|
||||||
|
|
||||||
%% load resource instances from *.conf files
|
%% load resource instances from *.conf files
|
||||||
-export([ load/1
|
-export([ load_dir/1
|
||||||
|
, load_file/1
|
||||||
|
, load_config/1
|
||||||
, lookup/1
|
, lookup/1
|
||||||
, list_all/0
|
, list_all/0
|
||||||
, lookup_by_type/1
|
, lookup_by_type/1
|
||||||
|
, create_local/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ hash_call/2
|
-export([ hash_call/2
|
||||||
|
@ -82,8 +85,8 @@ lookup_by_type(ResourceType) ->
|
||||||
[Data || #{mod := Mod} = Data <- list_all()
|
[Data || #{mod := Mod} = Data <- list_all()
|
||||||
, Mod =:= ResourceType].
|
, Mod =:= ResourceType].
|
||||||
|
|
||||||
-spec load(Dir :: string()) -> ok.
|
-spec load_dir(Dir :: string()) -> ok.
|
||||||
load(Dir) ->
|
load_dir(Dir) ->
|
||||||
lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
|
lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
|
||||||
|
|
||||||
load_file(File) ->
|
load_file(File) ->
|
||||||
|
@ -91,40 +94,51 @@ load_file(File) ->
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
logger:error("load resource from ~p failed: ~p", [File, Reason]);
|
logger:error("load resource from ~p failed: ~p", [File, Reason]);
|
||||||
RawConfig ->
|
RawConfig ->
|
||||||
case hocon:binary(RawConfig, #{format => map}) of
|
case load_config(RawConfig) of
|
||||||
{ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr,
|
{ok, Data} ->
|
||||||
<<"config">> := MapConfig}} ->
|
logger:debug("loaded resource instance from file: ~p, data: ~p",
|
||||||
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
|
[File, Data]);
|
||||||
{ok, ResourceType} ->
|
|
||||||
parse_and_load_config(Id, ResourceType, MapConfig);
|
|
||||||
{error, Reason} ->
|
|
||||||
logger:error("no such resource type: ~s, ~p",
|
|
||||||
[ResourceTypeStr, Reason])
|
|
||||||
end;
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
logger:error("load resource from ~p failed: ~p", [File, Reason])
|
logger:error("load resource from ~p failed: ~p", [File, Reason])
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_and_load_config(InstId, ResourceType, MapConfig) ->
|
-spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}.
|
||||||
case emqx_resource:parse_config(ResourceType, MapConfig) of
|
load_config(RawConfig) when is_binary(RawConfig) ->
|
||||||
{error, Reason} ->
|
case hocon:binary(RawConfig, #{format => map}) of
|
||||||
logger:error("parse config for resource ~p of type ~p failed: ~p",
|
{ok, ConfigTerm} -> load_config(ConfigTerm);
|
||||||
[InstId, ResourceType, Reason]);
|
Error -> Error
|
||||||
{ok, InstConf} ->
|
end;
|
||||||
create_instance_local(InstId, ResourceType, InstConf)
|
|
||||||
|
load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) ->
|
||||||
|
MapConfig = maps:get(<<"config">>, Config, #{}),
|
||||||
|
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
|
||||||
|
{ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig);
|
||||||
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
create_instance_local(InstId, ResourceType, InstConf) ->
|
parse_and_load_config(InstId, ResourceType, MapConfig) ->
|
||||||
case do_create(InstId, ResourceType, InstConf) of
|
case emqx_resource:parse_config(ResourceType, MapConfig) of
|
||||||
{ok, Data} ->
|
{ok, InstConf} -> create_local(InstId, ResourceType, InstConf);
|
||||||
logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p",
|
Error -> Error
|
||||||
[ResourceType, InstId, InstConf, Data]);
|
|
||||||
{error, Reason} ->
|
|
||||||
logger:error("create ~p resource instance: ~p failed: ~p, config: ~p",
|
|
||||||
[ResourceType, InstId, Reason, InstConf])
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
create_local(InstId, ResourceType, InstConf) ->
|
||||||
|
case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
|
||||||
|
{ok, Data} -> {ok, Data};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
save_config_to_disk(InstId, ResourceType, Config) ->
|
||||||
|
%% TODO: send an event to the config handler, and the hander (single process)
|
||||||
|
%% will dump configs for all instances (from an ETS table) to a file.
|
||||||
|
file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]),
|
||||||
|
jsx:encode(#{id => InstId, resource_type => ResourceType,
|
||||||
|
config => emqx_resource:call_jsonify(ResourceType, Config)})).
|
||||||
|
|
||||||
|
emqx_data_dir() ->
|
||||||
|
"data".
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -205,7 +219,13 @@ do_create(InstId, ResourceType, Config) ->
|
||||||
#{mod => ResourceType, config => Config,
|
#{mod => ResourceType, config => Config,
|
||||||
state => ResourceState, status => stopped}}),
|
state => ResourceState, status => stopped}}),
|
||||||
_ = do_health_check(InstId),
|
_ = do_health_check(InstId),
|
||||||
{ok, force_lookup(InstId)};
|
case save_config_to_disk(InstId, ResourceType, Config) of
|
||||||
|
ok -> {ok, force_lookup(InstId)};
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:error("save config for ~p resource ~p to disk failed: ~p",
|
||||||
|
[ResourceType, InstId, Reason]),
|
||||||
|
{error, Reason}
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]),
|
logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
|
|
|
@ -57,7 +57,8 @@ forms(_, []) -> [].
|
||||||
form(Mod, Form) ->
|
form(Mod, Form) ->
|
||||||
case Form of
|
case Form of
|
||||||
?Q("-emqx_resource_api_path('@Path').") ->
|
?Q("-emqx_resource_api_path('@Path').") ->
|
||||||
{fix_spec_attrs() ++ fix_api_attrs(erl_syntax:concrete(Path)) ++ fix_api_exports(),
|
{fix_spec_attrs() ++ fix_api_attrs(Mod, erl_syntax:concrete(Path))
|
||||||
|
++ fix_api_exports(),
|
||||||
[],
|
[],
|
||||||
fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
|
fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -75,18 +76,17 @@ fix_spec_funcs(_Mod) ->
|
||||||
, ?Q("structs() -> [\"config\"].")
|
, ?Q("structs() -> [\"config\"].")
|
||||||
].
|
].
|
||||||
|
|
||||||
fix_api_attrs(Path0) ->
|
fix_api_attrs(Mod, Path) ->
|
||||||
BaseName = filename:basename(Path0),
|
BaseName = atom_to_list(Mod),
|
||||||
Path = "/" ++ BaseName,
|
|
||||||
[erl_syntax:revert(
|
[erl_syntax:revert(
|
||||||
erl_syntax:attribute(?Q("rest_api"), [
|
erl_syntax:attribute(?Q("rest_api"), [
|
||||||
erl_syntax:abstract(#{
|
erl_syntax:abstract(#{
|
||||||
name => list_to_atom(Name ++ "_log_tracers"),
|
name => list_to_atom(Act ++ "_" ++ BaseName),
|
||||||
method => Method,
|
method => Method,
|
||||||
path => mk_path(Path, WithId),
|
path => mk_path(Path, WithId),
|
||||||
func => Func,
|
func => Func,
|
||||||
descr => Name ++ " the " ++ BaseName})]))
|
descr => Act ++ " the " ++ BaseName})]))
|
||||||
|| {Name, Method, WithId, Func} <- [
|
|| {Act, Method, WithId, Func} <- [
|
||||||
{"list", 'GET', noid, api_get_all},
|
{"list", 'GET', noid, api_get_all},
|
||||||
{"get", 'GET', id, api_get},
|
{"get", 'GET', id, api_get},
|
||||||
{"update", 'PUT', id, api_put},
|
{"update", 'PUT', id, api_put},
|
||||||
|
@ -110,5 +110,5 @@ fix_api_funcs(Mod) ->
|
||||||
emqx_resource_api:delete('@Mod@', Binding, Params)."))
|
emqx_resource_api:delete('@Mod@', Binding, Params)."))
|
||||||
].
|
].
|
||||||
|
|
||||||
mk_path(Path, id) -> Path ++ "/:bin:id";
|
mk_path(Path, id) -> string:trim(Path, trailing, "/") ++ "/:bin:id";
|
||||||
mk_path(Path, noid) -> Path.
|
mk_path(Path, noid) -> Path.
|
||||||
|
|
Loading…
Reference in New Issue