Merge pull request #8663 from JimMoen/refine-influxdb-conf-api

refine influxdb conf api
This commit is contained in:
JimMoen 2022-08-09 16:22:50 +08:00 committed by GitHub
commit cec47ba4f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 299 additions and 164 deletions

View File

@ -205,7 +205,7 @@ jobs:
- emqx - emqx
- emqx-enterprise - emqx-enterprise
runs-on: aws-amd64 runs-on: aws-amd64
container: "ghcr.io/emqx/emqx-schema-validate:0.3.3" container: "ghcr.io/emqx/emqx-schema-validate:0.3.5"
steps: steps:
- uses: actions/download-artifact@v2 - uses: actions/download-artifact@v2
name: Download schema dump name: Download schema dump

View File

@ -9,7 +9,7 @@
%%====================================================================================== %%======================================================================================
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge". namespace() -> "bridge_webhook".
roots() -> []. roots() -> [].

View File

@ -1,4 +1,4 @@
emqx_ee_bridge_hstream { emqx_ee_bridge_hstreamdb {
local_topic { local_topic {
desc { desc {
en: """ en: """

View File

@ -18,44 +18,30 @@ will be forwarded.
zh: "本地 Topic" zh: "本地 Topic"
} }
} }
measurement { write_syntax {
desc { desc {
en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported.""" en: """
zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符""" Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported.
See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
TLDR:
```
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
```
"""
zh: """
使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
TLDR:
```
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
```
"""
} }
label { label {
en: "Measurement" en: "write_syntax"
zh: "Measurement" zh: "写语句"
}
}
timestamp {
desc {
en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp"""
zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳"""
}
label {
en: "Timestamp"
zh: "Timestamp"
}
}
tags {
desc {
en: """The tags to be forwarded to the InfluxDB. Placeholders supported."""
zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符"""
}
label {
en: "Tags"
zh: "Tags"
}
}
fields {
desc {
en: """The fields to be forwarded to the InfluxDB. Placeholders supported."""
zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符"""
}
label {
en: "Fields"
zh: "Fields"
} }
} }
config_enable { config_enable {
@ -111,6 +97,7 @@ will be forwarded.
zh: "桥接名字" zh: "桥接名字"
} }
} }
desc_connector { desc_connector {
desc { desc {
en: """Generic configuration for the connector.""" en: """Generic configuration for the connector."""

View File

@ -14,8 +14,8 @@
api_schemas(Method) -> api_schemas(Method) ->
[ [
ref(emqx_ee_bridge_hstream, Method),
ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_mysql, Method),
ref(emqx_ee_bridge_hstreamdb, Method),
ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2")
@ -23,7 +23,7 @@ api_schemas(Method) ->
schema_modules() -> schema_modules() ->
[ [
emqx_ee_bridge_hstream, emqx_ee_bridge_hstreamdb,
emqx_ee_bridge_influxdb, emqx_ee_bridge_influxdb,
emqx_ee_bridge_mysql emqx_ee_bridge_mysql
]. ].
@ -41,7 +41,7 @@ conn_bridge_examples(Method) ->
lists:foldl(Fun, #{}, schema_modules()). lists:foldl(Fun, #{}, schema_modules()).
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
resource_type(hstreamdb) -> emqx_ee_connector_hstream; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(mysql) -> emqx_connector_mysql; resource_type(mysql) -> emqx_connector_mysql;
resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
@ -51,7 +51,7 @@ fields(bridges) ->
[ [
{hstreamdb, {hstreamdb,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
#{desc => <<"EMQX Enterprise Config">>} #{desc => <<"EMQX Enterprise Config">>}
)}, )},
{mysql, {mysql,

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_bridge_hstream). -module(emqx_ee_bridge_hstreamdb).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -50,7 +50,7 @@ values(put) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge". namespace() -> "bridge_hstreamdb".
roots() -> []. roots() -> [].
@ -71,7 +71,7 @@ fields("get") ->
field(connector) -> field(connector) ->
mk( mk(
hoconsc:union([binary(), ref(emqx_ee_connector_hstream, config)]), hoconsc:union([binary(), ref(emqx_ee_connector_hstreamdb, config)]),
#{ #{
required => true, required => true,
example => <<"hstreamdb:demo">>, example => <<"hstreamdb:demo">>,

View File

@ -3,9 +3,10 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_bridge_influxdb). -module(emqx_ee_bridge_influxdb).
-include("emqx_ee_bridge.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ee_bridge.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -48,6 +49,12 @@ conn_bridge_examples(Method) ->
values(Protocol, get) -> values(Protocol, get) ->
maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); maps:merge(values(Protocol, post), ?METRICS_EXAMPLE);
values(Protocol, post) -> values(Protocol, post) ->
case Protocol of
"influxdb_api_v2" ->
SupportUint = <<"uint_value=${payload.uint_key}u">>;
_ ->
SupportUint = <<>>
end,
#{ #{
type => list_to_atom(Protocol), type => list_to_atom(Protocol),
name => <<"demo">>, name => <<"demo">>,
@ -55,20 +62,17 @@ values(Protocol, post) ->
enable => true, enable => true,
direction => egress, direction => egress,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
measurement => <<"${topic}">>, write_syntax =>
tags => #{<<"clientid">> => <<"${clientid}">>}, <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
fields => #{ "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
<<"payload">> => <<"${payload}">>, "bool=${payload.bool}">>
<<"int_value">> => [int, <<"${payload.int_key}">>],
<<"uint_value">> => [uint, <<"${payload.uint_key}">>]
}
}; };
values(Protocol, put) -> values(Protocol, put) ->
values(Protocol, post). values(Protocol, post).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge". namespace() -> "bridge_influxdb".
roots() -> []. roots() -> [].
@ -77,13 +81,7 @@ fields(basic) ->
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})}, {write_syntax, fun write_syntax/1}
{timestamp,
mk(binary(), #{
desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false
})},
{tags, mk(map(), #{desc => ?DESC("tags"), required => false})},
{fields, mk(map(), #{desc => ?DESC("fields"), required => true})}
]; ];
fields("post_udp") -> fields("post_udp") ->
method_fileds(post, influxdb_udp); method_fileds(post, influxdb_udp);
@ -139,6 +137,89 @@ type_name_field(Type) ->
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HStream using `", string:to_upper(Method), "` method."]; ["Configuration for InfluxDB using `", string:to_upper(Method), "` method."];
desc(influxdb_udp) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_udp");
desc(influxdb_api_v1) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v1");
desc(influxdb_api_v2) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v2");
desc(_) -> desc(_) ->
undefined. undefined.
write_syntax(type) ->
list();
write_syntax(required) ->
true;
write_syntax(validator) ->
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
write_syntax(converter) ->
fun converter_influx_lines/1;
write_syntax(desc) ->
?DESC("write_syntax");
write_syntax(_) ->
undefined.
converter_influx_lines(RawLines) ->
Lines = string:tokens(str(RawLines), "\n"),
lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)).
converter_influx_line(Line, AccIn) ->
case string:tokens(str(Line), " ") of
[MeasurementAndTags, Fields, Timestamp] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| AccIn
];
[MeasurementAndTags, Fields] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
%% TODO: fix here both here and influxdb driver.
%% Default value should evaluated by InfluxDB.
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => "${timestamp}"
}
| AccIn
];
_ ->
throw("Bad InfluxDB Line Protocol schema")
end.
split_measurement_and_tags(Subject) ->
case string:tokens(Subject, ",") of
[] ->
throw("Bad Measurement schema");
[Measurement] ->
{Measurement, []};
[Measurement | Tags] ->
{Measurement, Tags}
end.
kv_pairs(Pairs) ->
kv_pairs(Pairs, []).
kv_pairs([], Acc) ->
lists:reverse(Acc);
kv_pairs([Pair | Rest], Acc) ->
case string:tokens(Pair, "=") of
[K, V] ->
%% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol.
kv_pairs(Rest, [{K, V} | Acc]);
_ ->
throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair))
end.
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(ee_bridge_hstream_SUITE). -module(ee_bridge_hstreamdb_SUITE).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).

View File

@ -1,5 +1,15 @@
emqx_ee_connector_hstreamdb {
config {
desc {
en: "HStreamDB connection config"
zh: "HStreamDB 连接配置。"
}
label: {
en: "Connection config"
zh: "连接配置"
}
}
emqx_ee_connector_hstream {
type { type {
desc { desc {
en: "The Connector Type." en: "The Connector Type."

View File

@ -1,4 +1,3 @@
emqx_ee_connector_influxdb { emqx_ee_connector_influxdb {
type { type {
desc { desc {
@ -43,7 +42,7 @@ emqx_ee_connector_influxdb {
} }
protocol { protocol {
desc { desc {
en: """InfluxDB protocol. UDP or HTTP API or HTTP API V2""" en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2"""
zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2""" zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2"""
} }
label: { label: {
@ -51,9 +50,9 @@ emqx_ee_connector_influxdb {
zh: """协议""" zh: """协议"""
} }
} }
protocol_udp { influxdb_udp {
desc { desc {
en: """InfluxDB protocol.""" en: """InfluxDB's UDP protocol."""
zh: """InfluxDB UDP 协议""" zh: """InfluxDB UDP 协议"""
} }
label: { label: {
@ -61,9 +60,9 @@ emqx_ee_connector_influxdb {
zh: """UDP 协议""" zh: """UDP 协议"""
} }
} }
protocol_api_v1 { influxdb_api_v1 {
desc { desc {
en: """InfluxDB protocol. Support InfluxDB v1.8 and before.""" en: """InfluxDB's protocol. Support InfluxDB v1.8 and before."""
zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本""" zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本"""
} }
label: { label: {
@ -71,9 +70,9 @@ emqx_ee_connector_influxdb {
zh: """HTTP API 协议""" zh: """HTTP API 协议"""
} }
} }
protocol_api_v2 { influxdb_api_v2 {
desc { desc {
en: """InfluxDB protocol. Support InfluxDB v2.0 and after.""" en: """InfluxDB's protocol. Support InfluxDB v2.0 and after."""
zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本""" zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本"""
} }
label: { label: {
@ -123,7 +122,7 @@ emqx_ee_connector_influxdb {
} }
org { org {
desc { desc {
en: """InfluxDB organization name.""" en: """Organization name of InfluxDB."""
zh: """InfluxDB 组织名称。""" zh: """InfluxDB 组织名称。"""
} }
label: { label: {

View File

@ -13,17 +13,17 @@
api_schemas(Method) -> api_schemas(Method) ->
[ [
ref(emqx_ee_connector_hstream, Method), ref(emqx_ee_connector_hstreamdb, Method),
ref(emqx_ee_connector_influxdb, Method ++ "_udp"), ref(emqx_ee_connector_influxdb, "udp_" ++ Method),
ref(emqx_ee_connector_influxdb, Method ++ "_api_v1"), ref(emqx_ee_connector_influxdb, "api_v1_" ++ Method),
ref(emqx_ee_connector_influxdb, Method ++ "_api_v2") ref(emqx_ee_connector_influxdb, "api_v2_" ++ Method)
]. ].
fields(connectors) -> fields(connectors) ->
[ [
{hstreamdb, {hstreamdb,
mk( mk(
hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), hoconsc:map(name, ref(emqx_ee_connector_hstreamdb, config)),
#{desc => <<"EMQX Enterprise Config">>} #{desc => <<"EMQX Enterprise Config">>}
)} )}
] ++ fields(influxdb); ] ++ fields(influxdb);
@ -52,6 +52,6 @@ connector_examples(Method) ->
schema_modules() -> schema_modules() ->
[ [
emqx_ee_connector_hstream, emqx_ee_connector_hstreamdb,
emqx_ee_connector_influxdb emqx_ee_connector_influxdb
]. ].

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_connector_hstream). -module(emqx_ee_connector_hstreamdb).
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
@ -24,8 +24,10 @@
]). ]).
-export([ -export([
namespace/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1,
connector_examples/1 connector_examples/1
]). ]).
@ -75,6 +77,7 @@ on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% schema %% schema
namespace() -> connector_hstreamdb.
roots() -> roots() ->
fields(config). fields(config).
@ -121,6 +124,9 @@ values(put) ->
values(_) -> values(_) ->
#{}. #{}.
desc(config) ->
?DESC("config").
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal functions %% internal functions
start_client(InstId, Config) -> start_client(InstId, Config) ->

View File

@ -20,7 +20,9 @@
]). ]).
-export([ -export([
namespace/0,
fields/1, fields/1,
desc/1,
connector_examples/1 connector_examples/1
]). ]).
@ -46,31 +48,32 @@ on_get_status(_InstId, #{client := Client}) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% schema %% schema
namespace() -> connector_influxdb.
fields("put_udp") -> fields("udp_get") ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("udp_post") ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("udp_put") ->
fields(influxdb_udp); fields(influxdb_udp);
fields("put_api_v1") -> fields("api_v1_get") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("api_v1_post") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("api_v1_put") ->
fields(influxdb_api_v1); fields(influxdb_api_v1);
fields("put_api_v2") -> fields("api_v2_get") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("api_v2_post") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("api_v2_put") ->
fields(influxdb_api_v2); fields(influxdb_api_v2);
fields("get_udp") ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("get_api_v1") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("get_api_v2") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields("post_udp") ->
Key = influxdb_udp,
fields(Key) ++ type_name_field(Key);
fields("post_api_v1") ->
Key = influxdb_api_v1,
fields(Key) ++ type_name_field(Key);
fields("post_api_v2") ->
Key = influxdb_api_v2,
fields(Key) ++ type_name_field(Key);
fields(basic) -> fields(basic) ->
[ [
{host, {host,
@ -159,6 +162,14 @@ values(api_v2, put) ->
token => <<"my_token">>, token => <<"my_token">>,
ssl => #{enable => false} ssl => #{enable => false}
}. }.
desc(influxdb_udp) ->
?DESC("influxdb_udp");
desc(influxdb_api_v1) ->
?DESC("influxdb_api_v1");
desc(influxdb_api_v2) ->
?DESC("influxdb_api_v2").
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal functions %% internal functions
@ -189,10 +200,7 @@ do_start_client(
ClientConfig, ClientConfig,
Config = #{ Config = #{
egress := #{ egress := #{
measurement := Measurement, write_syntax := Lines
timestamp := Timestamp,
tags := Tags,
fields := Fields
} }
} }
) -> ) ->
@ -202,10 +210,7 @@ do_start_client(
true -> true ->
State = #{ State = #{
client => Client, client => Client,
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), write_syntax => to_config(Lines)
timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
tags => to_tags_config(Tags),
fields => to_fields_config(Fields)
}, },
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting influxdb connector success", msg => "starting influxdb connector success",
@ -304,14 +309,23 @@ ssl_config(SSL = #{enable := true}) ->
%% Query %% Query
do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) ->
case data_to_point(Data, State) of {Points, Errs} = data_to_points(Data, State),
{ok, Point} -> lists:foreach(
case influxdb:write(Client, [Point]) of fun({error, Reason}) ->
?SLOG(error, #{
msg => "influxdb trans point failed",
connector => InstId,
reason => Reason
})
end,
Errs
),
case influxdb:write(Client, Points) of
ok -> ok ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "influxdb write point success", msg => "influxdb write point success",
connector => InstId, connector => InstId,
point => Point points => Points
}), }),
emqx_resource:query_success(AfterQuery); emqx_resource:query_success(AfterQuery);
{error, Reason} -> {error, Reason} ->
@ -321,29 +335,39 @@ do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client})
reason => Reason reason => Reason
}), }),
emqx_resource:query_failed(AfterQuery) emqx_resource:query_failed(AfterQuery)
end;
{error, Reason} ->
?SLOG(error, #{
msg => "influxdb trans point failed",
connector => InstId,
reason => Reason
}),
{error, Reason}
end. end.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans
to_tags_config(Tags) -> to_config(Lines) ->
maps:fold(fun to_maps_config/3, #{}, Tags). to_config(Lines, []).
to_fields_config(Fields) -> to_config([], Acc) ->
maps:fold(fun to_maps_config/3, #{}, Fields). lists:reverse(Acc);
to_config(
[
#{
measurement := Measurement,
timestamp := Timestamp,
tags := Tags,
fields := Fields
}
| Rest
],
Acc
) ->
Res = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
tags => to_kv_config(Tags),
fields => to_kv_config(Fields)
},
to_config(Rest, [Res | Acc]).
to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> ->
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
Res#{NK => {binary_to_atom(IntType, utf8), NV}};
to_maps_config(K, V, Res) -> to_maps_config(K, V, Res) ->
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
@ -351,14 +375,24 @@ to_maps_config(K, V, Res) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Data Trans %% Tags & Fields Data Trans
data_to_point( data_to_points(Data, #{write_syntax := Lines}) ->
lines_to_points(Data, Lines, [], []).
lines_to_points(_, [], Points, Errs) ->
{Points, Errs};
lines_to_points(
Data, Data,
[
#{ #{
measurement := Measurement, measurement := Measurement,
timestamp := Timestamp, timestamp := Timestamp,
tags := Tags, tags := Tags,
fields := Fields fields := Fields
} }
| Rest
],
ResAcc,
ErrAcc
) -> ) ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
@ -371,36 +405,54 @@ data_to_point(
tags => EncodeTags, tags => EncodeTags,
fields => EncodeFields fields => EncodeFields
}, },
{ok, Point}; lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc);
BadTimestamp -> BadTimestamp ->
{error, {bad_timestamp, BadTimestamp}} lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc])
end. end.
maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions),
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions),
case {NK, NV} of
{[undefined], _} ->
{Data, Res};
{_, [undefined]} ->
{Data, Res};
{_, [IntV]} when is_integer(IntV) ->
{Data, Res#{NK => {IntType, IntV}}}
end;
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res}) ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, KTransOptions = #{return => full_binary},
NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
case {NK, NV} of case {NK, NV} of
{[undefined], _} -> {[undefined], _} ->
{Data, Res}; {Data, Res};
{_, [undefined]} -> {_, [undefined]} ->
{Data, Res}; {Data, Res};
_ -> _ ->
{Data, Res#{bin(NK) => NV}} {Data, Res#{NK => value_type(NV)}}
end. end.
value_type([Int, <<"i">>]) when
is_integer(Int)
->
{int, Int};
value_type([UInt, <<"u">>]) ->
{uint, UInt};
value_type([<<"t">>]) ->
't';
value_type([<<"T">>]) ->
'T';
value_type([true]) ->
'true';
value_type([<<"TRUE">>]) ->
'TRUE';
value_type([<<"True">>]) ->
'True';
value_type([<<"f">>]) ->
'f';
value_type([<<"F">>]) ->
'F';
value_type([false]) ->
'false';
value_type([<<"FALSE">>]) ->
'FALSE';
value_type([<<"False">>]) ->
'False';
value_type(Val) ->
Val.
data_filter(undefined) -> undefined; data_filter(undefined) -> undefined;
data_filter(Int) when is_integer(Int) -> Int; data_filter(Int) when is_integer(Int) -> Int;
data_filter(Number) when is_number(Number) -> Number; data_filter(Number) when is_number(Number) -> Number;

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(ee_connector_hstream_SUITE). -module(ee_connector_hstreamdb_SUITE).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).

View File

@ -7,7 +7,7 @@ else
SCHEMA="$1" SCHEMA="$1"
fi fi
docker run -d --name langtool "ghcr.io/emqx/emqx-schema-validate:0.3.3" docker run -d --name langtool "ghcr.io/emqx/emqx-schema-validate:0.3.5"
docker exec -i langtool ./emqx_schema_validate - < "${SCHEMA}" docker exec -i langtool ./emqx_schema_validate - < "${SCHEMA}"
success="$?" success="$?"