diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index e17b9b3fe..94419c7d9 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -59,6 +59,9 @@ -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). +-define(set_tag, set_tag). +-define(set_field, set_field). + -define(IS_HTTP_ERROR(STATUS_CODE), (is_integer(STATUS_CODE) andalso (STATUS_CODE < 200 orelse STATUS_CODE >= 300)) @@ -710,8 +713,8 @@ line_to_point( precision := Precision } = Item ) -> - {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), - {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + {_, EncodedTags, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_tag}, Tags), + {_, EncodedFields, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_field}, Fields), maps:without([precision], Item#{ measurement => emqx_placeholder:proc_tmpl(Measurement, Data), tags => EncodedTags, @@ -727,34 +730,43 @@ time_unit(ms) -> millisecond; time_unit(us) -> microsecond; time_unit(ns) -> nanosecond. -maps_config_to_data(K, V, {Data, Res}) -> +maps_config_to_data(K, V, {Data, Res, SetType}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), NV = proc_quoted(V, Data, VTransOptions), case {NK, NV} of {[undefined], _} -> - {Data, Res}; + {Data, Res, SetType}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> - {Data, Res}; + {Data, Res, SetType}; {_, {quoted, [undefined | _]}} -> - {Data, Res}; + {Data, Res, SetType}; _ -> - {Data, Res#{ - list_to_binary(NK) => value_type(NV, tmpl_type(V)) - }} + NRes = Res#{ + list_to_binary(NK) => value_type(NV, #{ + tmpl_type => tmpl_type(V), set_type => SetType + }) + }, + {Data, NRes, SetType} end. +value_type([Number], #{set_type := ?set_tag}) when is_number(Number) -> + %% all `tag` values are treated as string + %% See also: https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set + emqx_utils_conv:bin(Number); +value_type([Str], #{set_type := ?set_tag}) when is_binary(Str) -> + Str; value_type({quoted, ValList}, _) -> {string_list, ValList}; -value_type([Int, <<"i">>], mixed) when is_integer(Int) -> +value_type([Int, <<"i">>], #{tmpl_type := mixed}) when is_integer(Int) -> {int, Int}; -value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> +value_type([UInt, <<"u">>], #{tmpl_type := mixed}) when is_integer(UInt) -> {uint, UInt}; %% write `1`, `1.0`, `-1.0` all as float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float -value_type([Number], _) when is_number(Number) -> +value_type([Number], #{set_type := ?set_field}) when is_number(Number) -> {float, Number}; value_type([<<"t">>], _) -> 't'; @@ -776,9 +788,9 @@ value_type([<<"FALSE">>], _) -> 'FALSE'; value_type([<<"False">>], _) -> 'False'; -value_type([Str], variable) when is_binary(Str) -> +value_type([Str], #{tmpl_type := variable}) when is_binary(Str) -> Str; -value_type([Str], literal) when is_binary(Str) -> +value_type([Str], #{tmpl_type := literal, set_type := ?set_field}) when is_binary(Str) -> %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint. %% otherwise, we should convert it to float. NumStr = binary:part(Str, 0, byte_size(Str) - 1), diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index e30e8b361..3d50282ab 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -864,6 +864,53 @@ t_any_num_as_float(Config) -> TimeReturned = pad_zero(TimeReturned0), ?assertEqual(TsStr, TimeReturned). +t_tag_set_use_literal_value(Config) -> + QueryMode = ?config(query_mode, Config), + Const = erlang:system_time(nanosecond), + ConstBin = integer_to_binary(Const), + TsStr = iolist_to_binary( + calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}]) + ), + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"write_syntax">> => + <<"mqtt,clientid=${clientid},tag_key1=100,tag_key2=123.4,tag_key3=66i,tag_key4=${payload.float_dp}", + " ", + "field_key1=100.1,field_key2=100i,field_key3=${payload.float_dp},bar=5i", + " ", ConstBin/binary>> + } + ) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + %% with decimal point + float_dp => 123.4 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) + }, + case QueryMode of + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)), + ok; + async -> + ?assertMatch(ok, send_message(Config, SentData)) + end, + %% sleep is still need even in sync mode, or we would get an empty result sometimes + ct:sleep(1500), + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{field_key1 => <<"100.1">>, field_key2 => <<"100">>, field_key3 => <<"123.4">>}, + assert_persisted_data(ClientId, Expected, PersistedData), + TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"field_key1">>, PersistedData)), + TimeReturned = pad_zero(TimeReturned0), + ?assertEqual(TsStr, TimeReturned). + t_bad_timestamp(Config) -> InfluxDBType = ?config(influxdb_type, Config), InfluxDBName = ?config(influxdb_name, Config), diff --git a/changes/fix-12880.en.md b/changes/fix-12880.en.md new file mode 100644 index 000000000..7d7a53777 --- /dev/null +++ b/changes/fix-12880.en.md @@ -0,0 +1,3 @@ +Fixed the issue where serialization failed when the value in the tag set used a literal value (int or float) in the influxdb action configuration. + +Which Tag Set value's type is always `String`. See also: [Line Protocol - Tag Set](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set)