Merge pull request #10041 from zmstone/0228-add-influx-line-protocol-desc
0228 add influx line protocol desc
This commit is contained in:
commit
8d63c6c08d
|
@ -0,0 +1,2 @@
|
||||||
|
For influxdb bridge, added integer value placeholder annotation hint to `write_syntax` documentation.
|
||||||
|
Also supported setting a constant value for the `timestamp` field.
|
|
@ -0,0 +1,2 @@
|
||||||
|
为 influxdb 桥接的配置项 `write_syntax` 描述文档增加了类型标识符的提醒。
|
||||||
|
另外在配置中支持 `timestamp` 使用一个常量。
|
|
@ -22,14 +22,16 @@ See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/
|
||||||
TLDR:</br>
|
TLDR:</br>
|
||||||
```
|
```
|
||||||
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
||||||
```"""
|
```
|
||||||
|
Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`."""
|
||||||
zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
|
zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
|
||||||
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
|
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
|
||||||
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
|
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
|
||||||
TLDR: </br>
|
TLDR: </br>
|
||||||
```
|
```
|
||||||
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
||||||
```"""
|
```
|
||||||
|
注意,整形数值占位符后需要添加一个字符 `i` 类型标识。例如 `${payload.int_value}i`"""
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: "Write Syntax"
|
en: "Write Syntax"
|
||||||
|
|
|
@ -663,6 +663,54 @@ t_start_ok_no_subject_tags_write_syntax(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_const_timestamp(Config) ->
|
||||||
|
QueryMode = ?config(query_mode, Config),
|
||||||
|
Const = erlang:system_time(nanosecond),
|
||||||
|
ConstBin = integer_to_binary(Const),
|
||||||
|
TsStr = iolist_to_binary(
|
||||||
|
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
<<"write_syntax">> =>
|
||||||
|
<<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = #{<<"foo">> => 123},
|
||||||
|
SentData = #{
|
||||||
|
<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
<<"payload">> => Payload
|
||||||
|
},
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
case QueryMode of
|
||||||
|
async -> ct:sleep(500);
|
||||||
|
sync -> ok
|
||||||
|
end,
|
||||||
|
PersistedData = query_by_clientid(ClientId, Config),
|
||||||
|
Expected = #{foo => <<"123">>},
|
||||||
|
assert_persisted_data(ClientId, Expected, PersistedData),
|
||||||
|
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
|
||||||
|
TimeReturned = pad_zero(TimeReturned0),
|
||||||
|
?assertEqual(TsStr, TimeReturned).
|
||||||
|
|
||||||
|
%% influxdb returns timestamps without trailing zeros such as
|
||||||
|
%% "2023-02-28T17:21:51.63678163Z"
|
||||||
|
%% while the standard should be
|
||||||
|
%% "2023-02-28T17:21:51.636781630Z"
|
||||||
|
pad_zero(BinTs) ->
|
||||||
|
StrTs = binary_to_list(BinTs),
|
||||||
|
[Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")),
|
||||||
|
[$Z | NanoNum] = lists:reverse(Nano),
|
||||||
|
Padding = lists:duplicate(10 - length(Nano), $0),
|
||||||
|
NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z",
|
||||||
|
iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")).
|
||||||
|
|
||||||
t_boolean_variants(Config) ->
|
t_boolean_variants(Config) ->
|
||||||
QueryMode = ?config(query_mode, Config),
|
QueryMode = ?config(query_mode, Config),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -783,7 +831,7 @@ t_bad_timestamp(Config) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
error := [
|
error := [
|
||||||
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
|
{error, {bad_timestamp, <<"bad_timestamp">>}}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
@ -793,7 +841,7 @@ t_bad_timestamp(Config) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error,
|
{error,
|
||||||
{unrecoverable_error, [
|
{unrecoverable_error, [
|
||||||
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
|
{error, {bad_timestamp, <<"bad_timestamp">>}}
|
||||||
]}},
|
]}},
|
||||||
Return
|
Return
|
||||||
);
|
);
|
||||||
|
|
|
@ -490,11 +490,11 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
|
||||||
is_list(Ts)
|
is_list(Ts)
|
||||||
->
|
->
|
||||||
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||||
case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of
|
case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of
|
||||||
[TsInt] when is_integer(TsInt) ->
|
{ok, TsInt} ->
|
||||||
Item1 = Item#{timestamp => TsInt},
|
Item1 = Item#{timestamp => TsInt},
|
||||||
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
|
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
|
||||||
BadTs ->
|
{error, BadTs} ->
|
||||||
lines_to_points(Data, Rest, ResultPointsAcc, [
|
lines_to_points(Data, Rest, ResultPointsAcc, [
|
||||||
{error, {bad_timestamp, BadTs}} | ErrorPointsAcc
|
{error, {bad_timestamp, BadTs}} | ErrorPointsAcc
|
||||||
])
|
])
|
||||||
|
@ -504,6 +504,16 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
|
||||||
->
|
->
|
||||||
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
|
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
|
||||||
|
|
||||||
|
parse_timestamp([TsInt]) when is_integer(TsInt) ->
|
||||||
|
{ok, TsInt};
|
||||||
|
parse_timestamp([TsBin]) ->
|
||||||
|
try
|
||||||
|
{ok, binary_to_integer(TsBin)}
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
{error, TsBin}
|
||||||
|
end.
|
||||||
|
|
||||||
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
|
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
|
||||||
case line_to_point(Data, Item) of
|
case line_to_point(Data, Item) of
|
||||||
#{fields := Fields} when map_size(Fields) =:= 0 ->
|
#{fields := Fields} when map_size(Fields) =:= 0 ->
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
# Test influxdb integration
|
||||||
|
|
||||||
|
This script starts two EMQX nodes and a influxdb server in docker container.
|
||||||
|
The bootstraping rule engine and data bridge config is provided in influx-bridge.conf
|
||||||
|
which got included in the bootstraping config bundle emqx.conf.
|
||||||
|
|
||||||
|
## Start the cluster
|
||||||
|
|
||||||
|
./start.sh
|
||||||
|
|
||||||
|
## How to run tests
|
||||||
|
|
||||||
|
The rule and bridge are configured to pipe data from MQTT topic `t/#` to the 'myvalues' measurement in the 'mqtt' bucket.
|
||||||
|
|
||||||
|
### Manual verification steps
|
||||||
|
|
||||||
|
* Start the cluster
|
||||||
|
* Send mqtt messages to topic `/t/a` with a JSON object as MQTT paylaod like `{"value": 1}`
|
||||||
|
* Observe data in influxdb `curl -k -H 'Authorization: Token abcdefg' -G 'https://localhost:8086/query?pretty=true' --data-urlencode "db=mqtt" --data-urlencode "q=SELECT * from myvalues"`
|
||||||
|
|
||||||
|
Example output the curl query against influxdb:
|
||||||
|
|
||||||
|
```
|
||||||
|
{"results":[{"statement_id":0,"series":[{"name":"myvalues","columns":["time","clientid","value"],"values":[["2023-02-28T11:13:29.039Z","a1",123]]}]}]
|
||||||
|
```
|
|
@ -30,7 +30,7 @@ bridges {
|
||||||
versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
|
versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
|
||||||
}
|
}
|
||||||
token = "abcdefg"
|
token = "abcdefg"
|
||||||
write_syntax = "mqtt,clientid=${clientid} value=${payload.value}"
|
write_syntax = "myvalues,clientid=${clientid} value=${payload.value}i"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue