Merge pull request #8785 from JimMoen/feat-influxdb-config

Remove influxdb connector config layer. See #8773
This commit is contained in:
JimMoen 2022-08-24 10:56:26 +08:00 committed by GitHub
commit eb21a37145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 127 additions and 219 deletions

View File

@ -47,7 +47,13 @@
%% exported for `emqx_telemetry' %% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]). -export([get_basic_usage_info/0]).
-define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql). -define(EGRESS_DIR_BRIDGES(T),
T == webhook;
T == mysql;
T == influxdb_api_v1;
T == influxdb_api_v2;
T == influxdb_udp
).
load() -> load() ->
Bridges = emqx:get_config([bridges], #{}), Bridges = emqx:get_config([bridges], #{}),

View File

@ -168,9 +168,10 @@ bridge_info_examples(Method) ->
). ).
ee_bridge_examples(Method) -> ee_bridge_examples(Method) ->
case erlang:function_exported(emqx_ee_bridge, examples, 1) of try
true -> emqx_ee_bridge:examples(Method); emqx_ee_bridge:examples(Method)
false -> #{} catch
_:_ -> #{}
end. end.
info_example(Type, Method) -> info_example(Type, Method) ->

View File

@ -49,16 +49,6 @@ TLDR: </br>
en: "Enable Or Disable Bridge" en: "Enable Or Disable Bridge"
zh: "启用/禁用桥接" zh: "启用/禁用桥接"
} }
}
config_direction {
desc {
en: """The direction of this bridge, MUST be 'egress'."""
zh: """桥接的方向,必须是 egress。"""
}
label {
en: "Bridge Direction"
zh: "桥接方向"
}
} }
desc_config { desc_config {
@ -94,14 +84,4 @@ TLDR: </br>
} }
} }
desc_connector {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
}
}
} }

View File

@ -84,14 +84,6 @@ namespace() -> "bridge_influxdb".
roots() -> []. roots() -> [].
fields(basic) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{write_syntax, fun write_syntax/1}
] ++
emqx_resource_schema:fields("resource_opts");
fields("post_udp") -> fields("post_udp") ->
method_fileds(post, influxdb_udp); method_fileds(post, influxdb_udp);
fields("post_api_v1") -> fields("post_api_v1") ->
@ -110,35 +102,37 @@ fields("get_api_v1") ->
method_fileds(get, influxdb_api_v1); method_fileds(get, influxdb_api_v1);
fields("get_api_v2") -> fields("get_api_v2") ->
method_fileds(get, influxdb_api_v2); method_fileds(get, influxdb_api_v2);
fields(Name) when fields(Type) when
Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 Type == influxdb_udp orelse Type == influxdb_api_v1 orelse Type == influxdb_api_v2
-> ->
fields(basic) ++ influxdb_bridge_common_fields() ++
connector_field(Name). connector_fields(Type).
method_fileds(post, ConnectorType) -> method_fileds(post, ConnectorType) ->
fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); influxdb_bridge_common_fields() ++
connector_fields(ConnectorType) ++
type_name_fields(ConnectorType);
method_fileds(get, ConnectorType) -> method_fileds(get, ConnectorType) ->
fields(basic) ++ influxdb_bridge_common_fields() ++
emqx_bridge_schema:metrics_status_fields() ++ connector_fields(ConnectorType) ++
connector_field(ConnectorType) ++ type_name_field(ConnectorType); type_name_fields(ConnectorType) ++
emqx_bridge_schema:metrics_status_fields();
method_fileds(put, ConnectorType) -> method_fileds(put, ConnectorType) ->
fields(basic) ++ connector_field(ConnectorType). influxdb_bridge_common_fields() ++
connector_fields(ConnectorType).
connector_field(Type) -> influxdb_bridge_common_fields() ->
[ emqx_bridge_schema:common_bridge_fields() ++
{connector, [
mk( {local_topic, mk(binary(), #{required => true, desc => ?DESC("local_topic")})},
hoconsc:union([binary(), ref(emqx_ee_connector_influxdb, Type)]), {write_syntax, fun write_syntax/1}
#{ ] ++
required => true, emqx_resource_schema:fields("resource_opts").
example => list_to_binary(atom_to_list(Type) ++ ":connector"),
desc => ?DESC(<<"desc_connector">>)
}
)}
].
type_name_field(Type) -> connector_fields(Type) ->
emqx_ee_connector_influxdb:fields(Type).
type_name_fields(Type) ->
[ [
{type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}

View File

@ -1,43 +1,29 @@
emqx_ee_connector_influxdb { emqx_ee_connector_influxdb {
type {
desc {
en: """The Connector Type."""
zh: """连接器类型。"""
}
label: {
en: """Connector Type"""
zh: """连接器类型"""
}
}
name { server {
desc { desc {
en: """Connector name, used as a human-readable description of the connector.""" en: """The IPv4 or IPv6 address or the hostname to connect to.</br>
zh: """连接器名称,人类可读的连接器描述。""" A host entry has the following form: `Host[:Port]`.</br>
The InfluxDB default port 8086 is used if `[:Port]` is not specified.
"""
zh: """将要连接的 IPv4 或 IPv6 地址,或者主机名。</br>
主机名具有以下形式:`Host[:Port]`。</br>
如果未指定 `[:Port]`,则使用 InfluxDB 默认端口 8086。
"""
}
label {
en: "Server Host"
zh: "服务器地址"
} }
label: {
en: """Connector Name"""
zh: """连接器名称"""
}
} }
host { precision {
desc { desc {
en: """InfluxDB host.""" en: """InfluxDB time precision."""
zh: """InfluxDB 主机地址。""" zh: """InfluxDB 时间精度。"""
} }
label: { label {
en: """Host""" en: """Time Precision"""
zh: """主机""" zh: """时间精度"""
}
}
port {
desc {
en: """InfluxDB port."""
zh: """InfluxDB 端口。"""
}
label: {
en: """Port"""
zh: """端口"""
} }
} }
protocol { protocol {
@ -45,17 +31,17 @@ emqx_ee_connector_influxdb {
en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2.""" en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2."""
zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2。""" zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2。"""
} }
label: { label {
en: """Protocol""" en: """Protocol"""
zh: """协议""" zh: """协议"""
} }
} }
influxdb_udp { influxdb_udp {
desc { desc {
en: """InfluxDB's UDP protocol.""" en: """InfluxDB's UDP protocol."""
zh: """InfluxDB UDP 协议。""" zh: """InfluxDB UDP 协议。"""
} }
label: { label {
en: """UDP Protocol""" en: """UDP Protocol"""
zh: """UDP 协议""" zh: """UDP 协议"""
} }
@ -65,7 +51,7 @@ emqx_ee_connector_influxdb {
en: """InfluxDB's protocol. Support InfluxDB v1.8 and before.""" en: """InfluxDB's protocol. Support InfluxDB v1.8 and before."""
zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本。""" zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本。"""
} }
label: { label {
en: """HTTP API Protocol""" en: """HTTP API Protocol"""
zh: """HTTP API 协议""" zh: """HTTP API 协议"""
} }
@ -75,7 +61,7 @@ emqx_ee_connector_influxdb {
en: """InfluxDB's protocol. Support InfluxDB v2.0 and after.""" en: """InfluxDB's protocol. Support InfluxDB v2.0 and after."""
zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本。""" zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本。"""
} }
label: { label {
en: """HTTP API V2 Protocol""" en: """HTTP API V2 Protocol"""
zh: """HTTP API V2 协议""" zh: """HTTP API V2 协议"""
} }
@ -85,7 +71,7 @@ emqx_ee_connector_influxdb {
en: """InfluxDB database.""" en: """InfluxDB database."""
zh: """InfluxDB 数据库。""" zh: """InfluxDB 数据库。"""
} }
label: { label {
en: "Database" en: "Database"
zh: "数据库" zh: "数据库"
} }
@ -95,7 +81,7 @@ emqx_ee_connector_influxdb {
en: "InfluxDB username." en: "InfluxDB username."
zh: "InfluxDB 用户名。" zh: "InfluxDB 用户名。"
} }
label: { label {
en: "Username" en: "Username"
zh: "用户名" zh: "用户名"
} }
@ -105,7 +91,7 @@ emqx_ee_connector_influxdb {
en: "InfluxDB password." en: "InfluxDB password."
zh: "InfluxDB 密码。" zh: "InfluxDB 密码。"
} }
label: { label {
en: "Password" en: "Password"
zh: "密码" zh: "密码"
} }
@ -115,7 +101,7 @@ emqx_ee_connector_influxdb {
en: "InfluxDB bucket name." en: "InfluxDB bucket name."
zh: "InfluxDB bucket 名称。" zh: "InfluxDB bucket 名称。"
} }
label: { label {
en: "Bucket" en: "Bucket"
zh: "Bucket" zh: "Bucket"
} }
@ -125,7 +111,7 @@ emqx_ee_connector_influxdb {
en: """Organization name of InfluxDB.""" en: """Organization name of InfluxDB."""
zh: """InfluxDB 组织名称。""" zh: """InfluxDB 组织名称。"""
} }
label: { label {
en: """Organization""" en: """Organization"""
zh: """组织""" zh: """组织"""
} }
@ -135,20 +121,10 @@ emqx_ee_connector_influxdb {
en: """InfluxDB token.""" en: """InfluxDB token."""
zh: """InfluxDB token。""" zh: """InfluxDB token。"""
} }
label: { label {
en: """Token""" en: """Token"""
zh: """Token""" zh: """Token"""
} }
} }
precision {
desc {
en: """InfluxDB time precision."""
zh: """InfluxDB 时间精度。"""
}
label: {
en: """Time Precision"""
zh: """时间精度"""
}
}
} }

View File

@ -0,0 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------
-define(INFLUXDB_DEFAULT_PORT, 8086).

View File

@ -3,6 +3,9 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_connector_influxdb). -module(emqx_ee_connector_influxdb).
-include("emqx_ee_connector.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -26,10 +29,15 @@
-export([ -export([
namespace/0, namespace/0,
fields/1, fields/1,
desc/1, desc/1
connector_examples/1
]). ]).
%% influxdb servers don't need parse
-define(INFLUXDB_HOST_OPTIONS, #{
host_type => hostname,
default_port => ?INFLUXDB_DEFAULT_PORT
}).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
@ -103,115 +111,41 @@ on_get_status(_InstId, #{client := Client}) ->
%% schema %% schema
namespace() -> connector_influxdb. namespace() -> connector_influxdb.
fields("udp_get") -> fields(common) ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("udp_post") ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("udp_put") ->
fields(influxdb_udp);
fields("api_v1_get") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("api_v1_post") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("api_v1_put") ->
fields(influxdb_api_v1);
fields("api_v2_get") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("api_v2_post") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("api_v2_put") ->
fields(influxdb_api_v2);
fields(basic) ->
[ [
{host, {server, fun server/1},
mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})},
{port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})},
{precision, {precision,
mk(enum([ns, us, ms, s, m, h]), #{ mk(enum([ns, us, ms, s, m, h]), #{
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})} })}
]; ];
fields(influxdb_udp) -> fields(influxdb_udp) ->
fields(basic); fields(common);
fields(influxdb_api_v1) -> fields(influxdb_api_v1) ->
[ fields(common) ++
{database, mk(binary(), #{required => true, desc => ?DESC("database")})}, [
{username, mk(binary(), #{desc => ?DESC("username")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})} {username, mk(binary(), #{desc => ?DESC("username")})},
] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); {password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(influxdb_api_v2) -> fields(influxdb_api_v2) ->
[ fields(common) ++
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, [
{org, mk(binary(), #{required => true, desc => ?DESC("org")})}, {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
{token, mk(binary(), #{required => true, desc => ?DESC("token")})} {org, mk(binary(), #{required => true, desc => ?DESC("org")})},
] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic). {token, mk(binary(), #{required => true, desc => ?DESC("token")})}
] ++ emqx_connector_schema_lib:ssl_fields().
type_name_field(Type) -> server(type) -> emqx_schema:ip_port();
[ server(required) -> true;
{type, mk(Type, #{required => true, desc => ?DESC("type")})}, server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
{name, mk(binary(), #{required => true, desc => ?DESC("name")})} server(converter) -> fun to_server_raw/1;
]. server(default) -> <<"127.0.0.1:8086">>;
server(desc) -> ?DESC("server");
connector_examples(Method) -> server(_) -> undefined.
[
#{
<<"influxdb_udp">> => #{
summary => <<"InfluxDB UDP Connector">>,
value => values(udp, Method)
}
},
#{
<<"influxdb_api_v1">> => #{
summary => <<"InfluxDB HTTP API V1 Connector">>,
value => values(api_v1, Method)
}
},
#{
<<"influxdb_api_v2">> => #{
summary => <<"InfluxDB HTTP API V2 Connector">>,
value => values(api_v2, Method)
}
}
].
values(Protocol, get) ->
values(Protocol, post);
values(Protocol, post) ->
Type = list_to_atom("influxdb_" ++ atom_to_list(Protocol)),
maps:merge(values(Protocol, put), #{type => Type, name => <<"connector">>});
values(udp, put) ->
#{
host => <<"127.0.0.1">>,
port => 8089,
precision => ms
};
values(api_v1, put) ->
#{
host => <<"127.0.0.1">>,
port => 8086,
precision => ms,
database => <<"my_db">>,
username => <<"my_user">>,
password => <<"my_password">>,
ssl => #{enable => false}
};
values(api_v2, put) ->
#{
host => <<"127.0.0.1">>,
port => 8086,
precision => ms,
bucket => <<"my_bucket">>,
org => <<"my_org">>,
token => <<"my_token">>,
ssl => #{enable => false}
}.
desc(common) ->
?DESC("common");
desc(influxdb_udp) -> desc(influxdb_udp) ->
?DESC("influxdb_udp"); ?DESC("influxdb_udp");
desc(influxdb_api_v1) -> desc(influxdb_api_v1) ->
@ -248,9 +182,7 @@ do_start_client(
InstId, InstId,
ClientConfig, ClientConfig,
Config = #{ Config = #{
egress := #{ write_syntax := Lines
write_syntax := Lines
}
} }
) -> ) ->
case influxdb:start_client(ClientConfig) of case influxdb:start_client(ClientConfig) of
@ -297,12 +229,11 @@ do_start_client(
client_config( client_config(
InstId, InstId,
Config = #{ Config = #{
host := Host, server := {Host, Port}
port := Port
} }
) -> ) ->
[ [
{host, binary_to_list(Host)}, {host, str(Host)},
{port, Port}, {port, Port},
{pool_size, erlang:system_info(schedulers)}, {pool_size, erlang:system_info(schedulers)},
{pool, binary_to_atom(InstId, utf8)}, {pool, binary_to_atom(InstId, utf8)},
@ -319,9 +250,9 @@ protocol_config(#{
[ [
{protocol, http}, {protocol, http},
{version, v1}, {version, v1},
{username, binary_to_list(Username)}, {username, str(Username)},
{password, binary_to_list(Password)}, {password, str(Password)},
{database, binary_to_list(DB)} {database, str(DB)}
] ++ ssl_config(SSL); ] ++ ssl_config(SSL);
%% api v1 config %% api v1 config
protocol_config(#{ protocol_config(#{
@ -333,8 +264,8 @@ protocol_config(#{
[ [
{protocol, http}, {protocol, http},
{version, v2}, {version, v2},
{bucket, binary_to_list(Bucket)}, {bucket, str(Bucket)},
{org, binary_to_list(Org)}, {org, str(Org)},
{token, Token} {token, Token}
] ++ ssl_config(SSL); ] ++ ssl_config(SSL);
%% udp config %% udp config
@ -555,3 +486,18 @@ log_error_points(InstId, Errs) ->
end, end,
Errs Errs
). ).
%% ===================================================================
%% typereflt funcs
-spec to_server_raw(string()) ->
{string(), pos_integer()}.
to_server_raw(Server) ->
emqx_connector_schema_lib:parse_server(Server, ?INFLUXDB_HOST_OPTIONS).
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.