fix: influxdb connector api available

This commit is contained in:
DDDHuang 2022-07-31 15:33:57 +08:00
parent 4c7ca2217c
commit fa54bf5612
3 changed files with 40 additions and 38 deletions

View File

@ -61,7 +61,11 @@ fields("config") ->
{connector, field(connector)} {connector, field(connector)}
]; ];
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [
{type, mk(enum([influxdb]), #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
| fields("config")
];
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
@ -89,11 +93,3 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HStream using `", string:to_upper(Method), "` method."]; ["Configuration for HStream using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->
undefined. undefined.
%% -------------------------------------------------------------------------------------------------
%% internal
type_field() ->
{type, mk(enum([influxdb]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -26,17 +26,16 @@ fields(connectors) ->
hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), hoconsc:map(name, ref(emqx_ee_connector_hstream, config)),
#{desc => <<"EMQX Enterprise Config">>} #{desc => <<"EMQX Enterprise Config">>}
)} )}
]; ] ++ fields(influxdb);
% ] ++ fields(influxdb);
fields(influxdb) -> fields(influxdb) ->
[ [
{ {
influxdb, Protocol,
mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, influxdb_udp)), #{ mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{
desc => <<"EMQX Enterprise Config">> desc => <<"EMQX Enterprise Config">>
}) })
} }
% || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2]
]. ].
connector_examples(Method) -> connector_examples(Method) ->
@ -52,4 +51,7 @@ connector_examples(Method) ->
lists:foldl(Fun, #{}, schema_modules()). lists:foldl(Fun, #{}, schema_modules()).
schema_modules() -> schema_modules() ->
[emqx_ee_connector_hstream, emqx_ee_connector_influxdb]. [
emqx_ee_connector_hstream,
emqx_ee_connector_influxdb
].

View File

@ -11,8 +11,6 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
-define(PUT_FIELDS_FILTER, fun({Name, _}) -> not lists:member(Name, [type, name]) end).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
on_start/2, on_start/2,
@ -50,23 +48,29 @@ on_get_status(_InstId, #{client := Client}) ->
%% schema %% schema
fields("put_udp") -> fields("put_udp") ->
lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_udp)); fields(influxdb_udp);
fields("put_api_v1") -> fields("put_api_v1") ->
lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v1)); fields(influxdb_api_v1);
fields("put_api_v2") -> fields("put_api_v2") ->
lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v2)); fields(influxdb_api_v2);
fields("get_udp") -> fields("get_udp") ->
fields(influxdb_udp); Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("get_api_v1") -> fields("get_api_v1") ->
fields(influxdb_api_v1); Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("get_api_v2") -> fields("get_api_v2") ->
fields(influxdb_api_v2); Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("post_udp") -> fields("post_udp") ->
fields(influxdb_udp); Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("post_api_v1") -> fields("post_api_v1") ->
fields(influxdb_api_v1); Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("post_api_v2") -> fields("post_api_v2") ->
fields(influxdb_api_v2); Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields(basic) -> fields(basic) ->
[ [
{host, {host,
@ -76,28 +80,29 @@ fields(basic) ->
mk(enum([ns, us, ms, s, m, h]), #{ mk(enum([ns, us, ms, s, m, h]), #{
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})}, })},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
]; ];
fields(influxdb_udp) -> fields(influxdb_udp) ->
[ fields(basic);
{type, mk(influxdb_udp, #{required => true, desc => ?DESC("type")})}
] ++ fields(basic);
fields(influxdb_api_v1) -> fields(influxdb_api_v1) ->
[ [
{type, mk(influxdb_api_v1, #{required => true, desc => ?DESC("type")})},
{database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{username, mk(binary(), #{required => true, desc => ?DESC("username")})}, {username, mk(binary(), #{required => true, desc => ?DESC("username")})},
{password, mk(binary(), #{required => true, desc => ?DESC("password")})} {password, mk(binary(), #{required => true, desc => ?DESC("password")})}
] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic);
fields(influxdb_api_v2) -> fields(influxdb_api_v2) ->
[ [
{type, mk(influxdb_api_v2, #{required => true, desc => ?DESC("type")})},
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
{org, mk(binary(), #{required => true, desc => ?DESC("org")})}, {org, mk(binary(), #{required => true, desc => ?DESC("org")})},
{token, mk(binary(), #{required => true, desc => ?DESC("token")})} {token, mk(binary(), #{required => true, desc => ?DESC("token")})}
] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic). ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic).
type_name_field(Type) ->
[
{type, mk(Type, #{required => true, desc => ?DESC("type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
].
connector_examples(Method) -> connector_examples(Method) ->
[ [
#{ #{
@ -123,9 +128,8 @@ connector_examples(Method) ->
values(Protocol, get) -> values(Protocol, get) ->
values(Protocol, post); values(Protocol, post);
values(Protocol, post) -> values(Protocol, post) ->
Type = list_to_atom(io_lib:format("influxdb_~p", [Protocol])), Type = list_to_atom("influxdb_" ++ atom_to_list(Protocol)),
ConnectorName = list_to_binary(io_lib:format("~p_connector", [Protocol])), maps:merge(values(Protocol, put), #{type => Type, name => <<"connector">>});
maps:merge(values(Protocol, put), #{type => Type, name => ConnectorName});
values(udp, put) -> values(udp, put) ->
#{ #{
host => <<"127.0.0.1">>, host => <<"127.0.0.1">>,
@ -219,7 +223,7 @@ do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadB
end. end.
client_config( client_config(
_InstId, InstId,
Config = #{ Config = #{
host := Host, host := Host,
port := Port, port := Port,
@ -230,7 +234,7 @@ client_config(
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{pool_size, PoolSize}, {pool_size, PoolSize},
{pool, atom_pname_todo}, {pool, binary_to_atom(InstId, utf8)},
{precision, maps:get(precision, Config, ms)} {precision, maps:get(precision, Config, ms)}
] ++ protocol_config(Config). ] ++ protocol_config(Config).