From 24f476e35f47000beab963beeb2564ec8f0be093 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 28 Feb 2023 12:15:37 +0100 Subject: [PATCH 1/3] test: add README to influxdb test script --- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 52 ++++++++++++++++++- .../src/emqx_ee_connector_influxdb.erl | 16 ++++-- scripts/test/influx/README.md | 25 +++++++++ scripts/test/influx/influx-bridge.conf | 2 +- 4 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 scripts/test/influx/README.md diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index bbde88cc7..6e5220ae0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -663,6 +663,54 @@ t_start_ok_no_subject_tags_write_syntax(Config) -> ), ok. +t_const_timestamp(Config) -> + QueryMode = ?config(query_mode, Config), + Const = erlang:system_time(nanosecond), + ConstBin = integer_to_binary(Const), + TsStr = iolist_to_binary( + calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}]) + ), + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"write_syntax">> => + <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>> + } + ) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{<<"foo">> => 123}, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload + }, + ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + async -> ct:sleep(500); + sync -> ok + end, + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{foo => <<"123">>}, + assert_persisted_data(ClientId, Expected, PersistedData), + TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), + TimeReturned = pad_zero(TimeReturned0), + ?assertEqual(TsStr, TimeReturned). + +%% influxdb returns timestamps without trailing zeros such as +%% "2023-02-28T17:21:51.63678163Z" +%% while the standard should be +%% "2023-02-28T17:21:51.636781630Z" +pad_zero(BinTs) -> + StrTs = binary_to_list(BinTs), + [Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")), + [$Z | NanoNum] = lists:reverse(Nano), + Padding = lists:duplicate(10 - length(Nano), $0), + NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z", + iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")). + t_boolean_variants(Config) -> QueryMode = ?config(query_mode, Config), ?assertMatch( @@ -783,7 +831,7 @@ t_bad_timestamp(Config) -> [ #{ error := [ - {error, {bad_timestamp, [<<"bad_timestamp">>]}} + {error, {bad_timestamp, <<"bad_timestamp">>}} ] } ], @@ -793,7 +841,7 @@ t_bad_timestamp(Config) -> ?assertEqual( {error, {unrecoverable_error, [ - {error, {bad_timestamp, [<<"bad_timestamp">>]}} + {error, {bad_timestamp, <<"bad_timestamp">>}} ]}}, Return ); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 785ec5d07..a361e7035 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -490,11 +490,11 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error is_list(Ts) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of - [TsInt] when is_integer(TsInt) -> + case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of + {ok, TsInt} -> Item1 = Item#{timestamp => TsInt}, continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); - BadTs -> + {error, BadTs} -> lines_to_points(Data, Rest, ResultPointsAcc, [ {error, {bad_timestamp, BadTs}} | ErrorPointsAcc ]) @@ -504,6 +504,16 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error -> continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). +parse_timestamp([TsInt]) when is_integer(TsInt) -> + {ok, TsInt}; +parse_timestamp([TsBin]) -> + try + {ok, binary_to_integer(TsBin)} + catch + _:_ -> + {error, TsBin} + end. + continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> case line_to_point(Data, Item) of #{fields := Fields} when map_size(Fields) =:= 0 -> diff --git a/scripts/test/influx/README.md b/scripts/test/influx/README.md new file mode 100644 index 000000000..ee492e69e --- /dev/null +++ b/scripts/test/influx/README.md @@ -0,0 +1,25 @@ +# Test influxdb integration + +This script starts two EMQX nodes and a influxdb server in docker container. +The bootstraping rule engine and data bridge config is provided in influx-bridge.conf +which got included in the bootstraping config bundle emqx.conf. + +## Start the cluster + +./start.sh + +## How to run tests + +The rule and bridge are configured to pipe data from MQTT topic `t/#` to the 'myvalues' measurement in the 'mqtt' bucket. + +### Manual verification steps + +* Start the cluster +* Send mqtt messages to topic `/t/a` with a JSON object as MQTT paylaod like `{"value": 1}` +* Observe data in influxdb `curl -k -H 'Authorization: Token abcdefg' -G 'https://localhost:8086/query?pretty=true' --data-urlencode "db=mqtt" --data-urlencode "q=SELECT * from myvalues"` + +Example output the curl query against influxdb: + +``` +{"results":[{"statement_id":0,"series":[{"name":"myvalues","columns":["time","clientid","value"],"values":[["2023-02-28T11:13:29.039Z","a1",123]]}]}] +``` diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 3b5bb9f9f..df10a0ec6 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -30,7 +30,7 @@ bridges { versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] } token = "abcdefg" - write_syntax = "mqtt,clientid=${clientid} value=${payload.value}" + write_syntax = "myvalues,clientid=${clientid} value=${payload.value}i" } } } From 41cb7263ca364fa6ee267e68070d94ddc6fa926f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 28 Feb 2023 18:53:06 +0100 Subject: [PATCH 2/3] docs(influxdb): add a node to the write_syntax config --- lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 8b2eadcfa..d73d62b14 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -22,14 +22,16 @@ See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/ TLDR:
``` [,=[,=]] =[,=] [] -```""" +``` +Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`.""" zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及 [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
TLDR:
``` [,=[,=]] =[,=] [] -```""" +``` +注意,整形数值占位符后需要添加一个字符 `i` 类型标识。例如 `${payload.int_value}i`""" } label { en: "Write Syntax" From 23fb924e54f251d4ff7283733738e636105e20e3 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 28 Feb 2023 19:00:21 +0100 Subject: [PATCH 3/3] docs: add changelogs --- changes/ce/fix-10041.en.md | 2 ++ changes/ce/fix-10041.zh.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 changes/ce/fix-10041.en.md create mode 100644 changes/ce/fix-10041.zh.md diff --git a/changes/ce/fix-10041.en.md b/changes/ce/fix-10041.en.md new file mode 100644 index 000000000..c1aff24c2 --- /dev/null +++ b/changes/ce/fix-10041.en.md @@ -0,0 +1,2 @@ +For influxdb bridge, added integer value placeholder annotation hint to `write_syntax` documentation. +Also supported setting a constant value for the `timestamp` field. diff --git a/changes/ce/fix-10041.zh.md b/changes/ce/fix-10041.zh.md new file mode 100644 index 000000000..d197ea81f --- /dev/null +++ b/changes/ce/fix-10041.zh.md @@ -0,0 +1,2 @@ +为 influxdb 桥接的配置项 `write_syntax` 描述文档增加了类型标识符的提醒。 +另外在配置中支持 `timestamp` 使用一个常量。