From 43c964c87ea63365e6acf97ee6966e66281536a5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 23 Aug 2022 18:22:01 +0800 Subject: [PATCH 1/2] fix(docs): ee bridge api docs generation Use try catch. Because function was unexported before called. --- apps/emqx_bridge/src/emqx_bridge_api.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 5fced2467..36f8cf0f5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -168,9 +168,10 @@ bridge_info_examples(Method) -> ). ee_bridge_examples(Method) -> - case erlang:function_exported(emqx_ee_bridge, examples, 1) of - true -> emqx_ee_bridge:examples(Method); - false -> #{} + try + emqx_ee_bridge:examples(Method) + catch + _:_ -> #{} end. info_example(Type, Method) -> From ca6533395c6c028c177289a2254e4f9b54164a48 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Aug 2022 09:40:08 +0800 Subject: [PATCH 2/2] feat: influxdb bridge structure fits new style --- apps/emqx_bridge/src/emqx_bridge.erl | 8 +- .../i18n/emqx_ee_bridge_influxdb.conf | 20 --- .../src/emqx_ee_bridge_influxdb.erl | 54 +++--- .../i18n/emqx_ee_connector_influxdb.conf | 84 ++++----- .../include/emqx_ee_connector.hrl | 5 + .../src/emqx_ee_connector_influxdb.erl | 168 ++++++------------ 6 files changed, 123 insertions(+), 216 deletions(-) create mode 100644 lib-ee/emqx_ee_connector/include/emqx_ee_connector.hrl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 109b1df86..151275bd9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -47,7 +47,13 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). --define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql). +-define(EGRESS_DIR_BRIDGES(T), + T == webhook; + T == mysql; + T == influxdb_api_v1; + T == influxdb_api_v2; + T == influxdb_udp +). load() -> Bridges = emqx:get_config([bridges], #{}), diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 9e805132e..f34de1119 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -49,16 +49,6 @@ TLDR:
en: "Enable Or Disable Bridge" zh: "启用/禁用桥接" } - } - config_direction { - desc { - en: """The direction of this bridge, MUST be 'egress'.""" - zh: """桥接的方向,必须是 egress。""" - } - label { - en: "Bridge Direction" - zh: "桥接方向" - } } desc_config { @@ -94,14 +84,4 @@ TLDR:
} } - desc_connector { - desc { - en: """Generic configuration for the connector.""" - zh: """连接器的通用配置。""" - } - label: { - en: "Connector Generic Configuration" - zh: "连接器通用配置。" - } - } } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index dce315721..740ee20ce 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -84,14 +84,6 @@ namespace() -> "bridge_influxdb". roots() -> []. -fields(basic) -> - [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {write_syntax, fun write_syntax/1} - ] ++ - emqx_resource_schema:fields("resource_opts"); fields("post_udp") -> method_fileds(post, influxdb_udp); fields("post_api_v1") -> @@ -110,35 +102,37 @@ fields("get_api_v1") -> method_fileds(get, influxdb_api_v1); fields("get_api_v2") -> method_fileds(get, influxdb_api_v2); -fields(Name) when - Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 +fields(Type) when + Type == influxdb_udp orelse Type == influxdb_api_v1 orelse Type == influxdb_api_v2 -> - fields(basic) ++ - connector_field(Name). + influxdb_bridge_common_fields() ++ + connector_fields(Type). method_fileds(post, ConnectorType) -> - fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); + influxdb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType); method_fileds(get, ConnectorType) -> - fields(basic) ++ - emqx_bridge_schema:metrics_status_fields() ++ - connector_field(ConnectorType) ++ type_name_field(ConnectorType); + influxdb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType) ++ + emqx_bridge_schema:metrics_status_fields(); method_fileds(put, ConnectorType) -> - fields(basic) ++ connector_field(ConnectorType). + influxdb_bridge_common_fields() ++ + connector_fields(ConnectorType). -connector_field(Type) -> - [ - {connector, - mk( - hoconsc:union([binary(), ref(emqx_ee_connector_influxdb, Type)]), - #{ - required => true, - example => list_to_binary(atom_to_list(Type) ++ ":connector"), - desc => ?DESC(<<"desc_connector">>) - } - )} - ]. +influxdb_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {local_topic, mk(binary(), #{required => true, desc => ?DESC("local_topic")})}, + {write_syntax, fun write_syntax/1} + ] ++ + emqx_resource_schema:fields("resource_opts"). -type_name_field(Type) -> +connector_fields(Type) -> + emqx_ee_connector_influxdb:fields(Type). + +type_name_fields(Type) -> [ {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index ff2266de5..81ea39d49 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -1,43 +1,29 @@ emqx_ee_connector_influxdb { - type { - desc { - en: """The Connector Type.""" - zh: """连接器类型。""" - } - label: { - en: """Connector Type""" - zh: """连接器类型""" - } - } - name { + server { desc { - en: """Connector name, used as a human-readable description of the connector.""" - zh: """连接器名称,人类可读的连接器描述。""" + en: """The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The InfluxDB default port 8086 is used if `[:Port]` is not specified. +""" + zh: """将要连接的 IPv4 或 IPv6 地址,或者主机名。
+主机名具有以下形式:`Host[:Port]`。
+如果未指定 `[:Port]`,则使用 InfluxDB 默认端口 8086。 +""" + } + label { + en: "Server Host" + zh: "服务器地址" } - label: { - en: """Connector Name""" - zh: """连接器名称""" - } } - host { + precision { desc { - en: """InfluxDB host.""" - zh: """InfluxDB 主机地址。""" + en: """InfluxDB time precision.""" + zh: """InfluxDB 时间精度。""" } - label: { - en: """Host""" - zh: """主机""" - } - } - port { - desc { - en: """InfluxDB port.""" - zh: """InfluxDB 端口。""" - } - label: { - en: """Port""" - zh: """端口""" + label { + en: """Time Precision""" + zh: """时间精度""" } } protocol { @@ -45,17 +31,17 @@ emqx_ee_connector_influxdb { en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2.""" zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2。""" } - label: { + label { en: """Protocol""" zh: """协议""" - } + } } influxdb_udp { desc { en: """InfluxDB's UDP protocol.""" zh: """InfluxDB UDP 协议。""" } - label: { + label { en: """UDP Protocol""" zh: """UDP 协议""" } @@ -65,7 +51,7 @@ emqx_ee_connector_influxdb { en: """InfluxDB's protocol. Support InfluxDB v1.8 and before.""" zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本。""" } - label: { + label { en: """HTTP API Protocol""" zh: """HTTP API 协议""" } @@ -75,7 +61,7 @@ emqx_ee_connector_influxdb { en: """InfluxDB's protocol. Support InfluxDB v2.0 and after.""" zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本。""" } - label: { + label { en: """HTTP API V2 Protocol""" zh: """HTTP API V2 协议""" } @@ -85,7 +71,7 @@ emqx_ee_connector_influxdb { en: """InfluxDB database.""" zh: """InfluxDB 数据库。""" } - label: { + label { en: "Database" zh: "数据库" } @@ -95,7 +81,7 @@ emqx_ee_connector_influxdb { en: "InfluxDB username." zh: "InfluxDB 用户名。" } - label: { + label { en: "Username" zh: "用户名" } @@ -105,7 +91,7 @@ emqx_ee_connector_influxdb { en: "InfluxDB password." zh: "InfluxDB 密码。" } - label: { + label { en: "Password" zh: "密码" } @@ -115,7 +101,7 @@ emqx_ee_connector_influxdb { en: "InfluxDB bucket name." zh: "InfluxDB bucket 名称。" } - label: { + label { en: "Bucket" zh: "Bucket" } @@ -125,7 +111,7 @@ emqx_ee_connector_influxdb { en: """Organization name of InfluxDB.""" zh: """InfluxDB 组织名称。""" } - label: { + label { en: """Organization""" zh: """组织""" } @@ -135,20 +121,10 @@ emqx_ee_connector_influxdb { en: """InfluxDB token.""" zh: """InfluxDB token。""" } - label: { + label { en: """Token""" zh: """Token""" } } - precision { - desc { - en: """InfluxDB time precision.""" - zh: """InfluxDB 时间精度。""" - } - label: { - en: """Time Precision""" - zh: """时间精度""" - } - } } diff --git a/lib-ee/emqx_ee_connector/include/emqx_ee_connector.hrl b/lib-ee/emqx_ee_connector/include/emqx_ee_connector.hrl new file mode 100644 index 000000000..73807d13a --- /dev/null +++ b/lib-ee/emqx_ee_connector/include/emqx_ee_connector.hrl @@ -0,0 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%------------------------------------------------------------------- + +-define(INFLUXDB_DEFAULT_PORT, 8086). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index d2725c797..b83aec4bd 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -3,6 +3,9 @@ %%-------------------------------------------------------------------- -module(emqx_ee_connector_influxdb). +-include("emqx_ee_connector.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). + -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -26,10 +29,15 @@ -export([ namespace/0, fields/1, - desc/1, - connector_examples/1 + desc/1 ]). +%% influxdb servers don't need parse +-define(INFLUXDB_HOST_OPTIONS, #{ + host_type => hostname, + default_port => ?INFLUXDB_DEFAULT_PORT +}). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -103,115 +111,41 @@ on_get_status(_InstId, #{client := Client}) -> %% schema namespace() -> connector_influxdb. -fields("udp_get") -> - Key = influxdb_udp, - fields(Key) ++ type_name_field(Key); -fields("udp_post") -> - Key = influxdb_udp, - fields(Key) ++ type_name_field(Key); -fields("udp_put") -> - fields(influxdb_udp); -fields("api_v1_get") -> - Key = influxdb_api_v1, - fields(Key) ++ type_name_field(Key); -fields("api_v1_post") -> - Key = influxdb_api_v1, - fields(Key) ++ type_name_field(Key); -fields("api_v1_put") -> - fields(influxdb_api_v1); -fields("api_v2_get") -> - Key = influxdb_api_v2, - fields(Key) ++ type_name_field(Key); -fields("api_v2_post") -> - Key = influxdb_api_v2, - fields(Key) ++ type_name_field(Key); -fields("api_v2_put") -> - fields(influxdb_api_v2); -fields(basic) -> +fields(common) -> [ - {host, - mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})}, - {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})}, + {server, fun server/1}, {precision, mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") })} ]; fields(influxdb_udp) -> - fields(basic); + fields(common); fields(influxdb_api_v1) -> - [ - {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})} - ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); + fields(common) ++ + [ + {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})} + ] ++ emqx_connector_schema_lib:ssl_fields(); fields(influxdb_api_v2) -> - [ - {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, - {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, - {token, mk(binary(), #{required => true, desc => ?DESC("token")})} - ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic). + fields(common) ++ + [ + {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, + {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, + {token, mk(binary(), #{required => true, desc => ?DESC("token")})} + ] ++ emqx_connector_schema_lib:ssl_fields(). -type_name_field(Type) -> - [ - {type, mk(Type, #{required => true, desc => ?DESC("type")})}, - {name, mk(binary(), #{required => true, desc => ?DESC("name")})} - ]. - -connector_examples(Method) -> - [ - #{ - <<"influxdb_udp">> => #{ - summary => <<"InfluxDB UDP Connector">>, - value => values(udp, Method) - } - }, - #{ - <<"influxdb_api_v1">> => #{ - summary => <<"InfluxDB HTTP API V1 Connector">>, - value => values(api_v1, Method) - } - }, - #{ - <<"influxdb_api_v2">> => #{ - summary => <<"InfluxDB HTTP API V2 Connector">>, - value => values(api_v2, Method) - } - } - ]. - -values(Protocol, get) -> - values(Protocol, post); -values(Protocol, post) -> - Type = list_to_atom("influxdb_" ++ atom_to_list(Protocol)), - maps:merge(values(Protocol, put), #{type => Type, name => <<"connector">>}); -values(udp, put) -> - #{ - host => <<"127.0.0.1">>, - port => 8089, - precision => ms - }; -values(api_v1, put) -> - #{ - host => <<"127.0.0.1">>, - port => 8086, - precision => ms, - database => <<"my_db">>, - username => <<"my_user">>, - password => <<"my_password">>, - ssl => #{enable => false} - }; -values(api_v2, put) -> - #{ - host => <<"127.0.0.1">>, - port => 8086, - precision => ms, - bucket => <<"my_bucket">>, - org => <<"my_org">>, - token => <<"my_token">>, - ssl => #{enable => false} - }. +server(type) -> emqx_schema:ip_port(); +server(required) -> true; +server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; +server(converter) -> fun to_server_raw/1; +server(default) -> <<"127.0.0.1:8086">>; +server(desc) -> ?DESC("server"); +server(_) -> undefined. +desc(common) -> + ?DESC("common"); desc(influxdb_udp) -> ?DESC("influxdb_udp"); desc(influxdb_api_v1) -> @@ -248,9 +182,7 @@ do_start_client( InstId, ClientConfig, Config = #{ - egress := #{ - write_syntax := Lines - } + write_syntax := Lines } ) -> case influxdb:start_client(ClientConfig) of @@ -297,12 +229,11 @@ do_start_client( client_config( InstId, Config = #{ - host := Host, - port := Port + server := {Host, Port} } ) -> [ - {host, binary_to_list(Host)}, + {host, str(Host)}, {port, Port}, {pool_size, erlang:system_info(schedulers)}, {pool, binary_to_atom(InstId, utf8)}, @@ -319,9 +250,9 @@ protocol_config(#{ [ {protocol, http}, {version, v1}, - {username, binary_to_list(Username)}, - {password, binary_to_list(Password)}, - {database, binary_to_list(DB)} + {username, str(Username)}, + {password, str(Password)}, + {database, str(DB)} ] ++ ssl_config(SSL); %% api v1 config protocol_config(#{ @@ -333,8 +264,8 @@ protocol_config(#{ [ {protocol, http}, {version, v2}, - {bucket, binary_to_list(Bucket)}, - {org, binary_to_list(Org)}, + {bucket, str(Bucket)}, + {org, str(Org)}, {token, Token} ] ++ ssl_config(SSL); %% udp config @@ -555,3 +486,18 @@ log_error_points(InstId, Errs) -> end, Errs ). + +%% =================================================================== +%% typereflt funcs + +-spec to_server_raw(string()) -> + {string(), pos_integer()}. +to_server_raw(Server) -> + emqx_connector_schema_lib:parse_server(Server, ?INFLUXDB_HOST_OPTIONS). + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S.