From b01ae8ece6a80870d538aac6362d5cff6f76faf0 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Aug 2022 11:57:02 +0800 Subject: [PATCH 1/3] chore: refine influxdb bridge/connector i18n --- .../i18n/emqx_resource_schema_i18n.conf | 23 +++++--------- .../i18n/emqx_ee_bridge_influxdb.conf | 30 ++++++++----------- .../i18n/emqx_ee_connector_influxdb.conf | 16 +++++----- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index aa6579bbd..c07573b1a 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -24,13 +24,8 @@ emqx_resource_schema { start_timeout { desc { - en: """ -If 'start_after_created' enabled, how long time do we wait for the -resource get started, in milliseconds. -""" - zh: """ -如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。 -""" + en: """If 'start_after_created' enabled, how long time do we wait for the resource get started, in milliseconds.""" + zh: """如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。""" } label { en: """Start Timeout""" @@ -40,12 +35,8 @@ resource get started, in milliseconds. auto_restart_interval { desc { - en: """ -The auto restart interval after the resource is disconnected, in milliseconds. -""" - zh: """ -资源断开以后,自动重连的时间间隔,单位毫秒。 -""" + en: """The auto restart interval after the resource is disconnected, in milliseconds.""" + zh: """资源断开以后,自动重连的时间间隔,单位毫秒。""" } label { en: """Auto Restart Interval""" @@ -89,7 +80,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. resume_interval { desc { en: """Resume time interval when resource down.""" - zh: """资源不可用时的重试时间""" + zh: """资源不可用时的重试时间。""" } label { en: """Resume interval""" @@ -100,7 +91,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. async_inflight_window { desc { en: """Async query inflight window.""" - zh: """异步请求飞行队列窗口大小""" + zh: """异步请求飞行队列窗口大小。""" } label { en: """Async inflight window""" @@ -111,7 +102,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. batch_size { desc { en: """Maximum batch count.""" - zh: """批量请求大小""" + zh: """批量请求大小。""" } label { en: """Batch size""" diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 412922408..9e805132e 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -1,15 +1,13 @@ emqx_ee_bridge_influxdb { local_topic { desc { - en: """ -The MQTT topic filter to be forwarded to the InfluxDB. All MQTT 'PUBLISH' messages with the topic + en: """The MQTT topic filter to be forwarded to the InfluxDB. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded. """ - zh: """ -发送到 'local_topic' 的消息都会转发到 InfluxDB。
+ zh: """发送到 'local_topic' 的消息都会转发到 InfluxDB。
注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 InfluxDB。 """ } @@ -20,20 +18,18 @@ will be forwarded. } write_syntax { desc { - en: """ -Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported. + en: """Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported. See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
-TLDR: +TLDR:
``` [,=[,=]] =[,=] [] ``` """ - zh: """ -使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
+ zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及 [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
-TLDR: +TLDR:
``` [,=[,=]] =[,=] [] ``` @@ -46,8 +42,8 @@ TLDR: } config_enable { desc { - en: """Enable or disable this bridge""" - zh: """启用/禁用桥接""" + en: """Enable or disable this bridge.""" + zh: """启用/禁用桥接。""" } label { en: "Enable Or Disable Bridge" @@ -56,8 +52,8 @@ TLDR: } config_direction { desc { - en: """The direction of this bridge, MUST be 'egress'""" - zh: """桥接的方向,必须是 egress""" + en: """The direction of this bridge, MUST be 'egress'.""" + zh: """桥接的方向,必须是 egress。""" } label { en: "Bridge Direction" @@ -68,7 +64,7 @@ TLDR: desc_config { desc { en: """Configuration for an InfluxDB bridge.""" - zh: """InfluxDB 桥接配置""" + zh: """InfluxDB 桥接配置。""" } label: { en: "InfluxDB Bridge Configuration" @@ -78,8 +74,8 @@ TLDR: desc_type { desc { - en: """The Bridge Type""" - zh: """Bridge 类型""" + en: """The Bridge Type.""" + zh: """Bridge 类型。""" } label { en: "Bridge Type" diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index 7e223b9b7..4d2dc168c 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -42,8 +42,8 @@ emqx_ee_connector_influxdb { } protocol { desc { - en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2""" - zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2""" + en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2.""" + zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2。""" } label: { en: """Protocol""" @@ -53,7 +53,7 @@ emqx_ee_connector_influxdb { influxdb_udp { desc { en: """InfluxDB's UDP protocol.""" - zh: """InfluxDB UDP 协议""" + zh: """InfluxDB UDP 协议。""" } label: { en: """UDP Protocol""" @@ -63,7 +63,7 @@ emqx_ee_connector_influxdb { influxdb_api_v1 { desc { en: """InfluxDB's protocol. Support InfluxDB v1.8 and before.""" - zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本""" + zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本。""" } label: { en: """HTTP API Protocol""" @@ -73,7 +73,7 @@ emqx_ee_connector_influxdb { influxdb_api_v2 { desc { en: """InfluxDB's protocol. Support InfluxDB v2.0 and after.""" - zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本""" + zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本。""" } label: { en: """HTTP API V2 Protocol""" @@ -113,7 +113,7 @@ emqx_ee_connector_influxdb { bucket { desc { en: "InfluxDB bucket name." - zh: "InfluxDB bucket 名称" + zh: "InfluxDB bucket 名称。" } label: { en: "Bucket" @@ -152,8 +152,8 @@ emqx_ee_connector_influxdb { } pool_size { desc { - en: """InfluxDB Pool Size""" - zh: """InfluxDB 连接池大小""" + en: """InfluxDB Pool Size. Default value is CPU threads.""" + zh: """InfluxDB 连接池大小,默认为 CPU 线程数。""" } label { en: """InfluxDB Pool Size""" From 68946f1f6c5638d5c41272f69484858706fdb293 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Aug 2022 11:57:37 +0800 Subject: [PATCH 2/3] feat: influxdb support `async`/`batch_async` query --- apps/emqx_resource/src/emqx_resource.erl | 10 +++++ .../src/emqx_ee_connector_influxdb.erl | 40 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b79650904..99e1f6057 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -115,6 +115,7 @@ on_query/3, on_batch_query/3, on_query_async/4, + on_batch_query_async/4, on_get_status/2 ]). @@ -131,6 +132,7 @@ %% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +%% when calling emqx_resource:on_query_async/4 -callback on_query_async( resource_id(), Request :: term(), @@ -138,6 +140,14 @@ resource_state() ) -> query_result(). +%% when calling emqx_resource:on_batch_query_async/4 +-callback on_batch_query_async( + resource_id(), + Request :: term(), + {ReplyFun :: function(), Args :: list()}, + resource_state() +) -> query_result(). + %% when calling emqx_resource:health_check/2 -callback on_get_status(resource_id(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index d0c17b6d5..2c2de9a99 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -19,6 +19,7 @@ on_query/3, on_batch_query/3, on_query_async/4, + on_batch_query_async/4, on_get_status/2 ]). @@ -31,7 +32,7 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start(InstId, Config) -> start_client(InstId, Config). @@ -50,17 +51,12 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) -> - case on_get_status(InstId, State) of - connected -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of - {ok, Points} -> - do_query(InstId, Client, Points); - {error, Reason} -> - {error, Reason} - end; - disconnected -> - {resource_down, disconnected} +on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, Reason} -> + {error, Reason} end. on_query_async( @@ -77,6 +73,24 @@ on_query_async( Err end. +on_batch_query_async( + InstId, + BatchData, + {ReplayFun, Args}, + State = #{write_syntax := SyntaxLines, client := Client} +) -> + case on_get_status(InstId, State) of + connected -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, Reason} -> + {error, Reason} + end; + disconnected -> + {resource_down, disconnected} + end. + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of true -> @@ -122,7 +136,7 @@ fields(basic) -> mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") })}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} + {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})} ]; fields(influxdb_udp) -> fields(basic); From d0e923590e09df6ac0b913a78340e8e7eeffffcf Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Aug 2022 14:01:02 +0800 Subject: [PATCH 3/3] fix: write influxdb line with undefined value --- .../emqx_ee_connector/src/emqx_ee_connector_influxdb.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 2c2de9a99..09a09aa44 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -502,7 +502,8 @@ maps_config_to_data(K, V, {Data, Res}) -> case {NK, NV} of {[undefined], _} -> {Data, Res}; - {_, [undefined]} -> + %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] + {_, [undefined | _]} -> {Data, Res}; _ -> {Data, Res#{NK => value_type(NV)}} @@ -512,7 +513,9 @@ value_type([Int, <<"i">>]) when is_integer(Int) -> {int, Int}; -value_type([UInt, <<"u">>]) -> +value_type([UInt, <<"u">>]) when + is_integer(UInt) +-> {uint, UInt}; value_type([<<"t">>]) -> 't';