diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index e17b9b3fe..94419c7d9 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -59,6 +59,9 @@ -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). +-define(set_tag, set_tag). +-define(set_field, set_field). + -define(IS_HTTP_ERROR(STATUS_CODE), (is_integer(STATUS_CODE) andalso (STATUS_CODE < 200 orelse STATUS_CODE >= 300)) @@ -710,8 +713,8 @@ line_to_point( precision := Precision } = Item ) -> - {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), - {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + {_, EncodedTags, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_tag}, Tags), + {_, EncodedFields, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_field}, Fields), maps:without([precision], Item#{ measurement => emqx_placeholder:proc_tmpl(Measurement, Data), tags => EncodedTags, @@ -727,34 +730,43 @@ time_unit(ms) -> millisecond; time_unit(us) -> microsecond; time_unit(ns) -> nanosecond. -maps_config_to_data(K, V, {Data, Res}) -> +maps_config_to_data(K, V, {Data, Res, SetType}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), NV = proc_quoted(V, Data, VTransOptions), case {NK, NV} of {[undefined], _} -> - {Data, Res}; + {Data, Res, SetType}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> - {Data, Res}; + {Data, Res, SetType}; {_, {quoted, [undefined | _]}} -> - {Data, Res}; + {Data, Res, SetType}; _ -> - {Data, Res#{ - list_to_binary(NK) => value_type(NV, tmpl_type(V)) - }} + NRes = Res#{ + list_to_binary(NK) => value_type(NV, #{ + tmpl_type => tmpl_type(V), set_type => SetType + }) + }, + {Data, NRes, SetType} end. +value_type([Number], #{set_type := ?set_tag}) when is_number(Number) -> + %% all `tag` values are treated as string + %% See also: https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set + emqx_utils_conv:bin(Number); +value_type([Str], #{set_type := ?set_tag}) when is_binary(Str) -> + Str; value_type({quoted, ValList}, _) -> {string_list, ValList}; -value_type([Int, <<"i">>], mixed) when is_integer(Int) -> +value_type([Int, <<"i">>], #{tmpl_type := mixed}) when is_integer(Int) -> {int, Int}; -value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> +value_type([UInt, <<"u">>], #{tmpl_type := mixed}) when is_integer(UInt) -> {uint, UInt}; %% write `1`, `1.0`, `-1.0` all as float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float -value_type([Number], _) when is_number(Number) -> +value_type([Number], #{set_type := ?set_field}) when is_number(Number) -> {float, Number}; value_type([<<"t">>], _) -> 't'; @@ -776,9 +788,9 @@ value_type([<<"FALSE">>], _) -> 'FALSE'; value_type([<<"False">>], _) -> 'False'; -value_type([Str], variable) when is_binary(Str) -> +value_type([Str], #{tmpl_type := variable}) when is_binary(Str) -> Str; -value_type([Str], literal) when is_binary(Str) -> +value_type([Str], #{tmpl_type := literal, set_type := ?set_field}) when is_binary(Str) -> %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint. %% otherwise, we should convert it to float. NumStr = binary:part(Str, 0, byte_size(Str) - 1),