From 1668e9ac7d734e150c1709f288df4726c47c53e9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 11 Jan 2024 18:22:50 +0800 Subject: [PATCH] fix: cannot write literal numbers to influxdb --- apps/emqx_bridge_influxdb/rebar.config | 2 +- .../src/emqx_bridge_influxdb_connector.erl | 119 ++++++++++++++---- .../test/emqx_bridge_influxdb_SUITE.erl | 1 + changes/ee/feat-12247.en.md | 1 + changes/ee/fix-12301.en.md | 1 + mix.exs | 2 +- 6 files changed, 99 insertions(+), 27 deletions(-) create mode 100644 changes/ee/feat-12247.en.md create mode 100644 changes/ee/fix-12301.en.md diff --git a/apps/emqx_bridge_influxdb/rebar.config b/apps/emqx_bridge_influxdb/rebar.config index c6ad26ac1..80d3f8dec 100644 --- a/apps/emqx_bridge_influxdb/rebar.config +++ b/apps/emqx_bridge_influxdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.13"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 7a84bc440..04fbe01c9 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -59,6 +59,11 @@ -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). +-define(IS_HTTP_ERROR(STATUS_CODE), + (is_integer(STATUS_CODE) andalso + (STATUS_CODE < 200 orelse STATUS_CODE >= 300)) +). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -541,7 +546,12 @@ reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) -> ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}), Result = {error, {unrecoverable_error, <<"authorization failure">>}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +reply_callback(ReplyFunAndArgs, {ok, Code, _, Body}) when ?IS_HTTP_ERROR(Code) -> + ?tp(influxdb_connector_do_query_failure, #{error => Body}), + Result = {error, {unrecoverable_error, Body}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); reply_callback(ReplyFunAndArgs, Result) -> + ?tp(influxdb_connector_do_query_ok, #{result => Result}), emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). %% ------------------------------------------------------------------------------------------------- @@ -711,56 +721,111 @@ time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), - case {NK0, NV} of + case {NK, NV} of {[undefined], _} -> {Data, Res}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> {Data, Res}; _ -> - NK = list_to_binary(NK0), - {Data, Res#{NK => value_type(NV)}} + {Data, Res#{ + list_to_binary(NK) => value_type(NV, tmpl_type(V)) + }} end. -value_type([Int, <<"i">>]) when - is_integer(Int) --> +value_type([Int, <<"i">>], mixed) when is_integer(Int) -> {int, Int}; -value_type([UInt, <<"u">>]) when - is_integer(UInt) --> +value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> {uint, UInt}; %% write `1`, `1.0`, `-1.0` all as float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float -value_type([Number]) when is_number(Number) -> - Number; -value_type([<<"t">>]) -> +value_type([Number], _) when is_number(Number) -> + {float, Number}; +value_type([<<"t">>], _) -> 't'; -value_type([<<"T">>]) -> +value_type([<<"T">>], _) -> 'T'; -value_type([true]) -> +value_type([true], _) -> 'true'; -value_type([<<"TRUE">>]) -> +value_type([<<"TRUE">>], _) -> 'TRUE'; -value_type([<<"True">>]) -> +value_type([<<"True">>], _) -> 'True'; -value_type([<<"f">>]) -> +value_type([<<"f">>], _) -> 'f'; -value_type([<<"F">>]) -> +value_type([<<"F">>], _) -> 'F'; -value_type([false]) -> +value_type([false], _) -> 'false'; -value_type([<<"FALSE">>]) -> +value_type([<<"FALSE">>], _) -> 'FALSE'; -value_type([<<"False">>]) -> +value_type([<<"False">>], _) -> 'False'; -value_type(Val) -> - Val. +value_type([Str], variable) when is_binary(Str) -> + Str; +value_type([Str], literal) when is_binary(Str) -> + %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint. + %% otherwise, we should convert it to float. + NumStr = binary:part(Str, 0, byte_size(Str) - 1), + case binary:part(Str, byte_size(Str), -1) of + <<"i">> -> + maybe_convert_to_integer(NumStr, Str, int); + <<"u">> -> + maybe_convert_to_integer(NumStr, Str, uint); + _ -> + maybe_convert_to_float_str(Str) + end; +value_type(Str, _) -> + list_to_binary(Str). + +tmpl_type([{str, _}]) -> + literal; +tmpl_type([{var, _}]) -> + variable; +tmpl_type(_) -> + mixed. + +maybe_convert_to_integer(NumStr, String, Type) -> + try + Int = binary_to_integer(NumStr), + {Type, Int} + catch + error:badarg -> + maybe_convert_to_integer_f(NumStr, String, Type) + end. + +maybe_convert_to_integer_f(NumStr, String, Type) -> + try + Float = binary_to_float(NumStr), + {Type, erlang:floor(Float)} + catch + error:badarg -> + String + end. + +maybe_convert_to_float_str(NumStr) -> + try + _ = binary_to_float(NumStr), + %% NOTE: return a {float, String} to avoid precision loss when converting to float + {float, NumStr} + catch + error:badarg -> + maybe_convert_to_float_str_i(NumStr) + end. + +maybe_convert_to_float_str_i(NumStr) -> + try + _ = binary_to_integer(NumStr), + {float, NumStr} + catch + error:badarg -> + NumStr + end. key_filter(undefined) -> undefined; -key_filter(Value) -> emqx_utils_conv:bin(Value). +key_filter(Value) -> bin(Value). data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; @@ -799,6 +864,10 @@ str(S) when is_list(S) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; +is_unrecoverable_error({error, {Code, _}}) when ?IS_HTTP_ERROR(Code) -> + true; +is_unrecoverable_error({error, {Code, _, _Body}}) when ?IS_HTTP_ERROR(Code) -> + true; is_unrecoverable_error(_) -> false. diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index d79139f17..edb88c72a 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -945,6 +945,7 @@ t_create_disconnected(Config) -> econnrefused -> ok; closed -> ok; {closed, _} -> ok; + {shutdown, closed} -> ok; _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) end, ok diff --git a/changes/ee/feat-12247.en.md b/changes/ee/feat-12247.en.md new file mode 100644 index 000000000..783e8e382 --- /dev/null +++ b/changes/ee/feat-12247.en.md @@ -0,0 +1 @@ +The bridges for InfluxDB have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/changes/ee/fix-12301.en.md b/changes/ee/fix-12301.en.md new file mode 100644 index 000000000..dde764015 --- /dev/null +++ b/changes/ee/fix-12301.en.md @@ -0,0 +1 @@ +Fixed issue where using line protocol to write numeric literals into InfluxDB, but the stored values end up being of string type. diff --git a/mix.exs b/mix.exs index de8195cfb..13d95ccb5 100644 --- a/mix.exs +++ b/mix.exs @@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},