Merge pull request #12880 from JimMoen/EMQX-12147/influx-write-syntax-tag-type

fix(influx): literal number values in tag set
This commit is contained in:
JimMoen 2024-04-17 10:23:57 +08:00 committed by GitHub
commit 1dfd9115cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 76 additions and 14 deletions

View File

@ -59,6 +59,9 @@
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
-define(set_tag, set_tag).
-define(set_field, set_field).
-define(IS_HTTP_ERROR(STATUS_CODE), -define(IS_HTTP_ERROR(STATUS_CODE),
(is_integer(STATUS_CODE) andalso (is_integer(STATUS_CODE) andalso
(STATUS_CODE < 200 orelse STATUS_CODE >= 300)) (STATUS_CODE < 200 orelse STATUS_CODE >= 300))
@ -710,8 +713,8 @@ line_to_point(
precision := Precision precision := Precision
} = Item } = Item
) -> ) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedTags, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_tag}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), {_, EncodedFields, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_field}, Fields),
maps:without([precision], Item#{ maps:without([precision], Item#{
measurement => emqx_placeholder:proc_tmpl(Measurement, Data), measurement => emqx_placeholder:proc_tmpl(Measurement, Data),
tags => EncodedTags, tags => EncodedTags,
@ -727,34 +730,43 @@ time_unit(ms) -> millisecond;
time_unit(us) -> microsecond; time_unit(us) -> microsecond;
time_unit(ns) -> nanosecond. time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res, SetType}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
NV = proc_quoted(V, Data, VTransOptions), NV = proc_quoted(V, Data, VTransOptions),
case {NK, NV} of case {NK, NV} of
{[undefined], _} -> {[undefined], _} ->
{Data, Res}; {Data, Res, SetType};
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
{_, [undefined | _]} -> {_, [undefined | _]} ->
{Data, Res}; {Data, Res, SetType};
{_, {quoted, [undefined | _]}} -> {_, {quoted, [undefined | _]}} ->
{Data, Res}; {Data, Res, SetType};
_ -> _ ->
{Data, Res#{ NRes = Res#{
list_to_binary(NK) => value_type(NV, tmpl_type(V)) list_to_binary(NK) => value_type(NV, #{
}} tmpl_type => tmpl_type(V), set_type => SetType
})
},
{Data, NRes, SetType}
end. end.
value_type([Number], #{set_type := ?set_tag}) when is_number(Number) ->
%% all `tag` values are treated as string
%% See also: https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set
emqx_utils_conv:bin(Number);
value_type([Str], #{set_type := ?set_tag}) when is_binary(Str) ->
Str;
value_type({quoted, ValList}, _) -> value_type({quoted, ValList}, _) ->
{string_list, ValList}; {string_list, ValList};
value_type([Int, <<"i">>], mixed) when is_integer(Int) -> value_type([Int, <<"i">>], #{tmpl_type := mixed}) when is_integer(Int) ->
{int, Int}; {int, Int};
value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> value_type([UInt, <<"u">>], #{tmpl_type := mixed}) when is_integer(UInt) ->
{uint, UInt}; {uint, UInt};
%% write `1`, `1.0`, `-1.0` all as float %% write `1`, `1.0`, `-1.0` all as float
%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
value_type([Number], _) when is_number(Number) -> value_type([Number], #{set_type := ?set_field}) when is_number(Number) ->
{float, Number}; {float, Number};
value_type([<<"t">>], _) -> value_type([<<"t">>], _) ->
't'; 't';
@ -776,9 +788,9 @@ value_type([<<"FALSE">>], _) ->
'FALSE'; 'FALSE';
value_type([<<"False">>], _) -> value_type([<<"False">>], _) ->
'False'; 'False';
value_type([Str], variable) when is_binary(Str) -> value_type([Str], #{tmpl_type := variable}) when is_binary(Str) ->
Str; Str;
value_type([Str], literal) when is_binary(Str) -> value_type([Str], #{tmpl_type := literal, set_type := ?set_field}) when is_binary(Str) ->
%% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint. %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint.
%% otherwise, we should convert it to float. %% otherwise, we should convert it to float.
NumStr = binary:part(Str, 0, byte_size(Str) - 1), NumStr = binary:part(Str, 0, byte_size(Str) - 1),

View File

@ -864,6 +864,53 @@ t_any_num_as_float(Config) ->
TimeReturned = pad_zero(TimeReturned0), TimeReturned = pad_zero(TimeReturned0),
?assertEqual(TsStr, TimeReturned). ?assertEqual(TsStr, TimeReturned).
t_tag_set_use_literal_value(Config) ->
QueryMode = ?config(query_mode, Config),
Const = erlang:system_time(nanosecond),
ConstBin = integer_to_binary(Const),
TsStr = iolist_to_binary(
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
),
?assertMatch(
{ok, _},
create_bridge(
Config,
#{
<<"write_syntax">> =>
<<"mqtt,clientid=${clientid},tag_key1=100,tag_key2=123.4,tag_key3=66i,tag_key4=${payload.float_dp}",
" ",
"field_key1=100.1,field_key2=100i,field_key3=${payload.float_dp},bar=5i",
" ", ConstBin/binary>>
}
)
),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{
%% with decimal point
float_dp => 123.4
},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData)),
ok;
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
%% sleep is still need even in sync mode, or we would get an empty result sometimes
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{field_key1 => <<"100.1">>, field_key2 => <<"100">>, field_key3 => <<"123.4">>},
assert_persisted_data(ClientId, Expected, PersistedData),
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"field_key1">>, PersistedData)),
TimeReturned = pad_zero(TimeReturned0),
?assertEqual(TsStr, TimeReturned).
t_bad_timestamp(Config) -> t_bad_timestamp(Config) ->
InfluxDBType = ?config(influxdb_type, Config), InfluxDBType = ?config(influxdb_type, Config),
InfluxDBName = ?config(influxdb_name, Config), InfluxDBName = ?config(influxdb_name, Config),

3
changes/fix-12880.en.md Normal file
View File

@ -0,0 +1,3 @@
Fixed the issue where serialization failed when the value in the tag set used a literal value (int or float) in the influxdb action configuration.
Which Tag Set value's type is always `String`. See also: [Line Protocol - Tag Set](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set)