diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 27f43d0ea..488189d81 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -205,7 +205,7 @@ jobs: - emqx - emqx-enterprise runs-on: aws-amd64 - container: "ghcr.io/emqx/emqx-schema-validate:0.3.3" + container: "ghcr.io/emqx/emqx-schema-validate:0.3.5" steps: - uses: actions/download-artifact@v2 name: Download schema dump diff --git a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl index f11247d68..d833e6ca8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl @@ -9,7 +9,7 @@ %%====================================================================================== %% Hocon Schema Definitions -namespace() -> "bridge". +namespace() -> "bridge_webhook". roots() -> []. diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstreamdb.conf similarity index 99% rename from lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf rename to lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstreamdb.conf index 2e4397a04..dd3346579 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstreamdb.conf @@ -1,4 +1,4 @@ -emqx_ee_bridge_hstream { +emqx_ee_bridge_hstreamdb { local_topic { desc { en: """ diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 3930825e5..ffd0b66a0 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -18,45 +18,31 @@ will be forwarded. zh: "本地 Topic" } } - measurement { + write_syntax { desc { - en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符""" + en: """ +Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported. +See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and +[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+TLDR: +``` +[,=[,=]] =[,=] [] +``` +""" + zh: """ +使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
+参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及 +[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+TLDR: +``` +[,=[,=]] =[,=] [] +``` +""" } label { - en: "Measurement" - zh: "Measurement" - } - } - timestamp { - desc { - en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp""" - zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳""" + en: "write_syntax" + zh: "写语句" } - label { - en: "Timestamp" - zh: "Timestamp" - } - } - tags { - desc { - en: """The tags to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符""" - } - label { - en: "Tags" - zh: "Tags" - } - } - fields { - desc { - en: """The fields to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符""" - } - label { - en: "Fields" - zh: "Fields" - } } config_enable { desc { @@ -111,6 +97,7 @@ will be forwarded. zh: "桥接名字" } } + desc_connector { desc { en: """Generic configuration for the connector.""" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index a690b50d0..ebc06d211 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -14,8 +14,8 @@ api_schemas(Method) -> [ - ref(emqx_ee_bridge_hstream, Method), ref(emqx_ee_bridge_mysql, Method), + ref(emqx_ee_bridge_hstreamdb, Method), ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") @@ -23,7 +23,7 @@ api_schemas(Method) -> schema_modules() -> [ - emqx_ee_bridge_hstream, + emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, emqx_ee_bridge_mysql ]. @@ -41,7 +41,7 @@ conn_bridge_examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -resource_type(hstreamdb) -> emqx_ee_connector_hstream; +resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(mysql) -> emqx_connector_mysql; resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; @@ -51,7 +51,7 @@ fields(bridges) -> [ {hstreamdb, mk( - hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), + hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), #{desc => <<"EMQX Enterprise Config">>} )}, {mysql, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl similarity index 94% rename from lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl rename to lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 200e695da..3b5183150 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_bridge_hstream). +-module(emqx_ee_bridge_hstreamdb). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -50,7 +50,7 @@ values(put) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge". +namespace() -> "bridge_hstreamdb". roots() -> []. @@ -71,7 +71,7 @@ fields("get") -> field(connector) -> mk( - hoconsc:union([binary(), ref(emqx_ee_connector_hstream, config)]), + hoconsc:union([binary(), ref(emqx_ee_connector_hstreamdb, config)]), #{ required => true, example => <<"hstreamdb:demo">>, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index a19459208..83c5a4127 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -3,9 +3,10 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_influxdb). +-include("emqx_ee_bridge.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include("emqx_ee_bridge.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -48,6 +49,12 @@ conn_bridge_examples(Method) -> values(Protocol, get) -> maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); values(Protocol, post) -> + case Protocol of + "influxdb_api_v2" -> + SupportUint = <<"uint_value=${payload.uint_key}u">>; + _ -> + SupportUint = <<>> + end, #{ type => list_to_atom(Protocol), name => <<"demo">>, @@ -55,20 +62,17 @@ values(Protocol, post) -> enable => true, direction => egress, local_topic => <<"local/topic/#">>, - measurement => <<"${topic}">>, - tags => #{<<"clientid">> => <<"${clientid}">>}, - fields => #{ - <<"payload">> => <<"${payload}">>, - <<"int_value">> => [int, <<"${payload.int_key}">>], - <<"uint_value">> => [uint, <<"${payload.uint_key}">>] - } + write_syntax => + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, + "bool=${payload.bool}">> }; values(Protocol, put) -> values(Protocol, post). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge". +namespace() -> "bridge_influxdb". roots() -> []. @@ -77,13 +81,7 @@ fields(basic) -> {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})}, - {timestamp, - mk(binary(), #{ - desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false - })}, - {tags, mk(map(), #{desc => ?DESC("tags"), required => false})}, - {fields, mk(map(), #{desc => ?DESC("fields"), required => true})} + {write_syntax, fun write_syntax/1} ]; fields("post_udp") -> method_fileds(post, influxdb_udp); @@ -139,6 +137,89 @@ type_name_field(Type) -> desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for HStream using `", string:to_upper(Method), "` method."]; + ["Configuration for InfluxDB using `", string:to_upper(Method), "` method."]; +desc(influxdb_udp) -> + ?DESC(emqx_ee_connector_influxdb, "influxdb_udp"); +desc(influxdb_api_v1) -> + ?DESC(emqx_ee_connector_influxdb, "influxdb_api_v1"); +desc(influxdb_api_v2) -> + ?DESC(emqx_ee_connector_influxdb, "influxdb_api_v2"); desc(_) -> undefined. + +write_syntax(type) -> + list(); +write_syntax(required) -> + true; +write_syntax(validator) -> + [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; +write_syntax(converter) -> + fun converter_influx_lines/1; +write_syntax(desc) -> + ?DESC("write_syntax"); +write_syntax(_) -> + undefined. + +converter_influx_lines(RawLines) -> + Lines = string:tokens(str(RawLines), "\n"), + lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)). + +converter_influx_line(Line, AccIn) -> + case string:tokens(str(Line), " ") of + [MeasurementAndTags, Fields, Timestamp] -> + {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), + [ + #{ + measurement => Measurement, + tags => kv_pairs(Tags), + fields => kv_pairs(string:tokens(Fields, ",")), + timestamp => Timestamp + } + | AccIn + ]; + [MeasurementAndTags, Fields] -> + {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), + %% TODO: fix here both here and influxdb driver. + %% Default value should evaluated by InfluxDB. + [ + #{ + measurement => Measurement, + tags => kv_pairs(Tags), + fields => kv_pairs(string:tokens(Fields, ",")), + timestamp => "${timestamp}" + } + | AccIn + ]; + _ -> + throw("Bad InfluxDB Line Protocol schema") + end. + +split_measurement_and_tags(Subject) -> + case string:tokens(Subject, ",") of + [] -> + throw("Bad Measurement schema"); + [Measurement] -> + {Measurement, []}; + [Measurement | Tags] -> + {Measurement, Tags} + end. + +kv_pairs(Pairs) -> + kv_pairs(Pairs, []). +kv_pairs([], Acc) -> + lists:reverse(Acc); +kv_pairs([Pair | Rest], Acc) -> + case string:tokens(Pair, "=") of + [K, V] -> + %% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol. + kv_pairs(Rest, [{K, V} | Acc]); + _ -> + throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair)) + end. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl b/lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl similarity index 92% rename from lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl rename to lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl index d03c13cac..429323ad7 100644 --- a/lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(ee_bridge_hstream_SUITE). +-module(ee_bridge_hstreamdb_SUITE). -compile(nowarn_export_all). -compile(export_all). diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstreamdb.conf similarity index 85% rename from lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf rename to lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstreamdb.conf index 080076065..0826c8f0c 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstreamdb.conf @@ -1,5 +1,15 @@ +emqx_ee_connector_hstreamdb { + config { + desc { + en: "HStreamDB connection config" + zh: "HStreamDB 连接配置。" + } + label: { + en: "Connection config" + zh: "连接配置" + } + } -emqx_ee_connector_hstream { type { desc { en: "The Connector Type." diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index a909c9a72..7e223b9b7 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -1,4 +1,3 @@ - emqx_ee_connector_influxdb { type { desc { @@ -43,7 +42,7 @@ emqx_ee_connector_influxdb { } protocol { desc { - en: """InfluxDB protocol. UDP or HTTP API or HTTP API V2""" + en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2""" zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2""" } label: { @@ -51,9 +50,9 @@ emqx_ee_connector_influxdb { zh: """协议""" } } - protocol_udp { + influxdb_udp { desc { - en: """InfluxDB protocol.""" + en: """InfluxDB's UDP protocol.""" zh: """InfluxDB UDP 协议""" } label: { @@ -61,9 +60,9 @@ emqx_ee_connector_influxdb { zh: """UDP 协议""" } } - protocol_api_v1 { + influxdb_api_v1 { desc { - en: """InfluxDB protocol. Support InfluxDB v1.8 and before.""" + en: """InfluxDB's protocol. Support InfluxDB v1.8 and before.""" zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本""" } label: { @@ -71,9 +70,9 @@ emqx_ee_connector_influxdb { zh: """HTTP API 协议""" } } - protocol_api_v2 { + influxdb_api_v2 { desc { - en: """InfluxDB protocol. Support InfluxDB v2.0 and after.""" + en: """InfluxDB's protocol. Support InfluxDB v2.0 and after.""" zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本""" } label: { @@ -123,7 +122,7 @@ emqx_ee_connector_influxdb { } org { desc { - en: """InfluxDB organization name.""" + en: """Organization name of InfluxDB.""" zh: """InfluxDB 组织名称。""" } label: { diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl index 9e8e7bac1..6846ea740 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl @@ -13,17 +13,17 @@ api_schemas(Method) -> [ - ref(emqx_ee_connector_hstream, Method), - ref(emqx_ee_connector_influxdb, Method ++ "_udp"), - ref(emqx_ee_connector_influxdb, Method ++ "_api_v1"), - ref(emqx_ee_connector_influxdb, Method ++ "_api_v2") + ref(emqx_ee_connector_hstreamdb, Method), + ref(emqx_ee_connector_influxdb, "udp_" ++ Method), + ref(emqx_ee_connector_influxdb, "api_v1_" ++ Method), + ref(emqx_ee_connector_influxdb, "api_v2_" ++ Method) ]. fields(connectors) -> [ {hstreamdb, mk( - hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), + hoconsc:map(name, ref(emqx_ee_connector_hstreamdb, config)), #{desc => <<"EMQX Enterprise Config">>} )} ] ++ fields(influxdb); @@ -52,6 +52,6 @@ connector_examples(Method) -> schema_modules() -> [ - emqx_ee_connector_hstream, + emqx_ee_connector_hstreamdb, emqx_ee_connector_influxdb ]. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl similarity index 98% rename from lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl rename to lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl index 7a8ce3fe9..8ee37cd8a 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_connector_hstream). +-module(emqx_ee_connector_hstreamdb). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -24,8 +24,10 @@ ]). -export([ + namespace/0, roots/0, fields/1, + desc/1, connector_examples/1 ]). @@ -75,6 +77,7 @@ on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> %% ------------------------------------------------------------------------------------------------- %% schema +namespace() -> connector_hstreamdb. roots() -> fields(config). @@ -121,6 +124,9 @@ values(put) -> values(_) -> #{}. +desc(config) -> + ?DESC("config"). + %% ------------------------------------------------------------------------------------------------- %% internal functions start_client(InstId, Config) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index c6528ffd0..9582f1729 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -20,7 +20,9 @@ ]). -export([ + namespace/0, fields/1, + desc/1, connector_examples/1 ]). @@ -46,31 +48,32 @@ on_get_status(_InstId, #{client := Client}) -> %% ------------------------------------------------------------------------------------------------- %% schema +namespace() -> connector_influxdb. -fields("put_udp") -> +fields("udp_get") -> + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); +fields("udp_post") -> + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); +fields("udp_put") -> fields(influxdb_udp); -fields("put_api_v1") -> +fields("api_v1_get") -> + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); +fields("api_v1_post") -> + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); +fields("api_v1_put") -> fields(influxdb_api_v1); -fields("put_api_v2") -> +fields("api_v2_get") -> + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); +fields("api_v2_post") -> + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); +fields("api_v2_put") -> fields(influxdb_api_v2); -fields("get_udp") -> - Key = influxdb_udp, - fields(Key) ++ type_name_field(Key); -fields("get_api_v1") -> - Key = influxdb_api_v1, - fields(Key) ++ type_name_field(Key); -fields("get_api_v2") -> - Key = influxdb_api_v2, - fields(Key) ++ type_name_field(Key); -fields("post_udp") -> - Key = influxdb_udp, - fields(Key) ++ type_name_field(Key); -fields("post_api_v1") -> - Key = influxdb_api_v1, - fields(Key) ++ type_name_field(Key); -fields("post_api_v2") -> - Key = influxdb_api_v2, - fields(Key) ++ type_name_field(Key); fields(basic) -> [ {host, @@ -159,6 +162,14 @@ values(api_v2, put) -> token => <<"my_token">>, ssl => #{enable => false} }. + +desc(influxdb_udp) -> + ?DESC("influxdb_udp"); +desc(influxdb_api_v1) -> + ?DESC("influxdb_api_v1"); +desc(influxdb_api_v2) -> + ?DESC("influxdb_api_v2"). + %% ------------------------------------------------------------------------------------------------- %% internal functions @@ -189,10 +200,7 @@ do_start_client( ClientConfig, Config = #{ egress := #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields + write_syntax := Lines } } ) -> @@ -202,10 +210,7 @@ do_start_client( true -> State = #{ client => Client, - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), - tags => to_tags_config(Tags), - fields => to_fields_config(Fields) + write_syntax => to_config(Lines) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -304,46 +309,65 @@ ssl_config(SSL = #{enable := true}) -> %% Query do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> - case data_to_point(Data, State) of - {ok, Point} -> - case influxdb:write(Client, [Point]) of - ok -> - ?SLOG(debug, #{ - msg => "influxdb write point success", - connector => InstId, - point => Point - }), - emqx_resource:query_success(AfterQuery); - {error, Reason} -> - ?SLOG(error, #{ - msg => "influxdb write point failed", - connector => InstId, - reason => Reason - }), - emqx_resource:query_failed(AfterQuery) - end; - {error, Reason} -> + {Points, Errs} = data_to_points(Data, State), + lists:foreach( + fun({error, Reason}) -> ?SLOG(error, #{ msg => "influxdb trans point failed", connector => InstId, reason => Reason + }) + end, + Errs + ), + case influxdb:write(Client, Points) of + ok -> + ?SLOG(debug, #{ + msg => "influxdb write point success", + connector => InstId, + points => Points }), - {error, Reason} + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb write point failed", + connector => InstId, + reason => Reason + }), + emqx_resource:query_failed(AfterQuery) end. %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans -to_tags_config(Tags) -> - maps:fold(fun to_maps_config/3, #{}, Tags). +to_config(Lines) -> + to_config(Lines, []). -to_fields_config(Fields) -> - maps:fold(fun to_maps_config/3, #{}, Fields). +to_config([], Acc) -> + lists:reverse(Acc); +to_config( + [ + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + | Rest + ], + Acc +) -> + Res = #{ + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), + tags => to_kv_config(Tags), + fields => to_kv_config(Fields) + }, + to_config(Rest, [Res | Acc]). + +to_kv_config(KVfields) -> + maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). -to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> -> - NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), - NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), - Res#{NK => {binary_to_atom(IntType, utf8), NV}}; to_maps_config(K, V, Res) -> NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), @@ -351,14 +375,24 @@ to_maps_config(K, V, Res) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans -data_to_point( +data_to_points(Data, #{write_syntax := Lines}) -> + lines_to_points(Data, Lines, [], []). + +lines_to_points(_, [], Points, Errs) -> + {Points, Errs}; +lines_to_points( Data, - #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields - } + [ + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + | Rest + ], + ResAcc, + ErrAcc ) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of @@ -371,36 +405,54 @@ data_to_point( tags => EncodeTags, fields => EncodeFields }, - {ok, Point}; + lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc); BadTimestamp -> - {error, {bad_timestamp, BadTimestamp}} + lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc]) end. -maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint -> - TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), - NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), - case {NK, NV} of - {[undefined], _} -> - {Data, Res}; - {_, [undefined]} -> - {Data, Res}; - {_, [IntV]} when is_integer(IntV) -> - {Data, Res#{NK => {IntType, IntV}}} - end; maps_config_to_data(K, V, {Data, Res}) -> - TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), - NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), + KTransOptions = #{return => full_binary}, + VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), + NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), case {NK, NV} of {[undefined], _} -> {Data, Res}; {_, [undefined]} -> {Data, Res}; _ -> - {Data, Res#{bin(NK) => NV}} + {Data, Res#{NK => value_type(NV)}} end. +value_type([Int, <<"i">>]) when + is_integer(Int) +-> + {int, Int}; +value_type([UInt, <<"u">>]) -> + {uint, UInt}; +value_type([<<"t">>]) -> + 't'; +value_type([<<"T">>]) -> + 'T'; +value_type([true]) -> + 'true'; +value_type([<<"TRUE">>]) -> + 'TRUE'; +value_type([<<"True">>]) -> + 'True'; +value_type([<<"f">>]) -> + 'f'; +value_type([<<"F">>]) -> + 'F'; +value_type([false]) -> + 'false'; +value_type([<<"FALSE">>]) -> + 'FALSE'; +value_type([<<"False">>]) -> + 'False'; +value_type(Val) -> + Val. + data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; data_filter(Number) when is_number(Number) -> Number; diff --git a/lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl b/lib-ee/emqx_ee_connector/test/ee_connector_hstreamdb_SUITE.erl similarity index 91% rename from lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl rename to lib-ee/emqx_ee_connector/test/ee_connector_hstreamdb_SUITE.erl index cebe77c6a..4de456b2b 100644 --- a/lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/ee_connector_hstreamdb_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(ee_connector_hstream_SUITE). +-module(ee_connector_hstreamdb_SUITE). -compile(nowarn_export_all). -compile(export_all). diff --git a/scripts/spellcheck b/scripts/spellcheck index 51d8d2907..f8af8c3f6 100755 --- a/scripts/spellcheck +++ b/scripts/spellcheck @@ -7,7 +7,7 @@ else SCHEMA="$1" fi -docker run -d --name langtool "ghcr.io/emqx/emqx-schema-validate:0.3.3" +docker run -d --name langtool "ghcr.io/emqx/emqx-schema-validate:0.3.5" docker exec -i langtool ./emqx_schema_validate - < "${SCHEMA}" success="$?"