fix(emqx_connector): the HTTP API for emqx_connector_mysql
This commit is contained in:
parent
bea5966ac8
commit
3da62e59d6
|
@ -21,7 +21,8 @@
|
||||||
-emqx_resource_api_path("connectors/mysql").
|
-emqx_resource_api_path("connectors/mysql").
|
||||||
|
|
||||||
-export([ fields/1
|
-export([ fields/1
|
||||||
, on_config_to_file/1
|
, on_jsonify/1
|
||||||
|
, on_api_reply_format/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% callbacks of behaviour emqx_resource
|
%% callbacks of behaviour emqx_resource
|
||||||
|
@ -40,8 +41,21 @@ fields("config") ->
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
on_config_to_file(#{server := Server} = Config) ->
|
on_jsonify(#{server := Server, user := User, database := DB, password := Passwd,
|
||||||
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
|
cacertfile := CAFile, keyfile := KeyFile, certfile := CertFile} = Config) ->
|
||||||
|
Config#{
|
||||||
|
user => list_to_binary(User),
|
||||||
|
database => list_to_binary(DB),
|
||||||
|
password => list_to_binary(Passwd),
|
||||||
|
server => emqx_connector_schema_lib:ip_port_to_string(Server),
|
||||||
|
cacertfile => list_to_binary(CAFile),
|
||||||
|
keyfile => list_to_binary(KeyFile),
|
||||||
|
certfile => list_to_binary(CertFile)
|
||||||
|
}.
|
||||||
|
|
||||||
|
on_api_reply_format(ResourceData) ->
|
||||||
|
#{config := Conf} = Reply0 = emqx_resource_api:default_api_reply_format(ResourceData),
|
||||||
|
Reply0#{config => maps:without([cacertfile, keyfile, certfile, verify], Conf)}.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
|
|
|
@ -118,4 +118,4 @@ to_ip_port(Str) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ip_port_to_string({Ip, Port}) ->
|
ip_port_to_string({Ip, Port}) ->
|
||||||
inet:ntoa(Ip) ++ ":" ++ integer_to_list(Port).
|
iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{erl_opts, [ debug_info
|
{erl_opts, [ debug_info
|
||||||
, nowarn_unused_import
|
, nowarn_unused_import
|
||||||
%, {d, 'RESOURCE_DEBUG'}
|
, {d, 'RESOURCE_DEBUG'}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{erl_first_files, ["src/emqx_resource_transform.erl"]}.
|
{erl_first_files, ["src/emqx_resource_transform.erl"]}.
|
||||||
|
|
|
@ -65,7 +65,8 @@
|
||||||
, call_health_check/3 %% verify if the resource is working normally
|
, call_health_check/3 %% verify if the resource is working normally
|
||||||
, call_stop/3 %% stop the instance
|
, call_stop/3 %% stop the instance
|
||||||
, call_config_merge/4 %% merge the config when updating
|
, call_config_merge/4 %% merge the config when updating
|
||||||
, call_config_to_file/2
|
, call_jsonify/2
|
||||||
|
, call_api_reply_format/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ list_instances/0 %% list all the instances, id only.
|
-export([ list_instances/0 %% list all the instances, id only.
|
||||||
|
@ -84,16 +85,16 @@
|
||||||
|
|
||||||
-optional_callbacks([ on_query/4
|
-optional_callbacks([ on_query/4
|
||||||
, on_health_check/2
|
, on_health_check/2
|
||||||
, on_api_reply_format/1
|
|
||||||
, on_config_merge/3
|
, on_config_merge/3
|
||||||
, on_config_to_file/1
|
, on_jsonify/1
|
||||||
|
, on_api_reply_format/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback on_api_reply_format(resource_data()) -> map().
|
-callback on_api_reply_format(resource_data()) -> jsx:json_term().
|
||||||
|
|
||||||
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
||||||
|
|
||||||
-callback on_config_to_file(resource_config()) -> jsx:json_term().
|
-callback on_jsonify(resource_config()) -> jsx:json_term().
|
||||||
|
|
||||||
%% when calling emqx_resource:start/1
|
%% when calling emqx_resource:start/1
|
||||||
-callback on_start(instance_id(), resource_config()) ->
|
-callback on_start(instance_id(), resource_config()) ->
|
||||||
|
@ -243,11 +244,28 @@ call_stop(InstId, Mod, ResourceState) ->
|
||||||
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
||||||
resource_config().
|
resource_config().
|
||||||
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
||||||
?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)).
|
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||||
|
true ->
|
||||||
|
?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params));
|
||||||
|
false when is_map(OldConfig), is_map(NewConfig) ->
|
||||||
|
maps:merge(OldConfig, NewConfig);
|
||||||
|
false ->
|
||||||
|
NewConfig
|
||||||
|
end.
|
||||||
|
|
||||||
-spec call_config_to_file(module(), resource_config()) -> jsx:json_term().
|
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
|
||||||
call_config_to_file(Mod, Config) ->
|
call_jsonify(Mod, Config) ->
|
||||||
?SAFE_CALL(Mod:on_config_to_file(Config)).
|
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||||
|
false -> Config;
|
||||||
|
true -> ?SAFE_CALL(Mod:on_jsonify(Config))
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec call_api_reply_format(module(), resource_data()) -> jsx:json_term().
|
||||||
|
call_api_reply_format(Mod, Data) ->
|
||||||
|
case erlang:function_exported(Mod, on_api_reply_format, 1) of
|
||||||
|
false -> emqx_resource_api:default_api_reply_format(Data);
|
||||||
|
true -> ?SAFE_CALL(Mod:on_api_reply_format(Data))
|
||||||
|
end.
|
||||||
|
|
||||||
-spec parse_config(resource_type(), binary() | term()) ->
|
-spec parse_config(resource_type(), binary() | term()) ->
|
||||||
{ok, resource_config()} | {error, term()}.
|
{ok, resource_config()} | {error, term()}.
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
, put/3
|
, put/3
|
||||||
, delete/3
|
, delete/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([default_api_reply_format/1]).
|
||||||
|
|
||||||
get_all(Mod, _Binding, _Params) ->
|
get_all(Mod, _Binding, _Params) ->
|
||||||
{200, #{code => 0, data =>
|
{200, #{code => 0, data =>
|
||||||
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
|
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
|
||||||
|
@ -63,15 +66,11 @@ delete(_Mod, #{id := Id}, _Params) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format_data(Mod, Data) ->
|
format_data(Mod, Data) ->
|
||||||
case erlang:function_exported(Mod, on_api_reply_format, 1) of
|
emqx_resource:call_api_reply_format(Mod, Data).
|
||||||
false ->
|
|
||||||
default_api_reply_format(Data);
|
|
||||||
true ->
|
|
||||||
Mod:on_api_reply_format(Data)
|
|
||||||
end.
|
|
||||||
|
|
||||||
default_api_reply_format(#{id := Id, status := Status, config := Config}) ->
|
default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) ->
|
||||||
#{node => node(), id => Id, status => Status, config => Config}.
|
#{node => node(), id => Id, status => Status, resource_type => Mod,
|
||||||
|
config => emqx_resource:call_jsonify(Mod, Config)}.
|
||||||
|
|
||||||
stringnify(Bin) when is_binary(Bin) -> Bin;
|
stringnify(Bin) when is_binary(Bin) -> Bin;
|
||||||
stringnify(Str) when is_list(Str) -> list_to_binary(Str);
|
stringnify(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
|
|
|
@ -134,7 +134,7 @@ save_config_to_disk(InstId, ResourceType, Config) ->
|
||||||
%% will dump configs for all instances (from an ETS table) to a file.
|
%% will dump configs for all instances (from an ETS table) to a file.
|
||||||
file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]),
|
file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]),
|
||||||
jsx:encode(#{id => InstId, resource_type => ResourceType,
|
jsx:encode(#{id => InstId, resource_type => ResourceType,
|
||||||
config => emqx_resource:call_config_to_file(ResourceType, Config)})).
|
config => emqx_resource:call_jsonify(ResourceType, Config)})).
|
||||||
|
|
||||||
emqx_data_dir() ->
|
emqx_data_dir() ->
|
||||||
"data".
|
"data".
|
||||||
|
|
|
@ -57,7 +57,8 @@ forms(_, []) -> [].
|
||||||
form(Mod, Form) ->
|
form(Mod, Form) ->
|
||||||
case Form of
|
case Form of
|
||||||
?Q("-emqx_resource_api_path('@Path').") ->
|
?Q("-emqx_resource_api_path('@Path').") ->
|
||||||
{fix_spec_attrs() ++ fix_api_attrs(erl_syntax:concrete(Path)) ++ fix_api_exports(),
|
{fix_spec_attrs() ++ fix_api_attrs(Mod, erl_syntax:concrete(Path))
|
||||||
|
++ fix_api_exports(),
|
||||||
[],
|
[],
|
||||||
fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
|
fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -75,18 +76,17 @@ fix_spec_funcs(_Mod) ->
|
||||||
, ?Q("structs() -> [\"config\"].")
|
, ?Q("structs() -> [\"config\"].")
|
||||||
].
|
].
|
||||||
|
|
||||||
fix_api_attrs(Path0) ->
|
fix_api_attrs(Mod, Path) ->
|
||||||
BaseName = filename:basename(Path0),
|
BaseName = atom_to_list(Mod),
|
||||||
Path = "/" ++ BaseName,
|
|
||||||
[erl_syntax:revert(
|
[erl_syntax:revert(
|
||||||
erl_syntax:attribute(?Q("rest_api"), [
|
erl_syntax:attribute(?Q("rest_api"), [
|
||||||
erl_syntax:abstract(#{
|
erl_syntax:abstract(#{
|
||||||
name => list_to_atom(Name ++ "_log_tracers"),
|
name => list_to_atom(Act ++ "_" ++ BaseName),
|
||||||
method => Method,
|
method => Method,
|
||||||
path => mk_path(Path, WithId),
|
path => mk_path(Path, WithId),
|
||||||
func => Func,
|
func => Func,
|
||||||
descr => Name ++ " the " ++ BaseName})]))
|
descr => Act ++ " the " ++ BaseName})]))
|
||||||
|| {Name, Method, WithId, Func} <- [
|
|| {Act, Method, WithId, Func} <- [
|
||||||
{"list", 'GET', noid, api_get_all},
|
{"list", 'GET', noid, api_get_all},
|
||||||
{"get", 'GET', id, api_get},
|
{"get", 'GET', id, api_get},
|
||||||
{"update", 'PUT', id, api_put},
|
{"update", 'PUT', id, api_put},
|
||||||
|
@ -110,5 +110,5 @@ fix_api_funcs(Mod) ->
|
||||||
emqx_resource_api:delete('@Mod@', Binding, Params)."))
|
emqx_resource_api:delete('@Mod@', Binding, Params)."))
|
||||||
].
|
].
|
||||||
|
|
||||||
mk_path(Path, id) -> Path ++ "/:bin:id";
|
mk_path(Path, id) -> string:trim(Path, trailing, "/") ++ "/:bin:id";
|
||||||
mk_path(Path, noid) -> Path.
|
mk_path(Path, noid) -> Path.
|
||||||
|
|
Loading…
Reference in New Issue