fix: cannot write literal numbers to influxdb

This commit is contained in:
Shawn 2024-01-11 18:22:50 +08:00
parent c4b778b592
commit 1668e9ac7d
6 changed files with 99 additions and 27 deletions

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}}, {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.13"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -59,6 +59,11 @@
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
-define(IS_HTTP_ERROR(STATUS_CODE),
(is_integer(STATUS_CODE) andalso
(STATUS_CODE < 200 orelse STATUS_CODE >= 300))
).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
@ -541,7 +546,12 @@ reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}), ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}}, Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, {ok, Code, _, Body}) when ?IS_HTTP_ERROR(Code) ->
?tp(influxdb_connector_do_query_failure, #{error => Body}),
Result = {error, {unrecoverable_error, Body}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, Result) -> reply_callback(ReplyFunAndArgs, Result) ->
?tp(influxdb_connector_do_query_ok, #{result => Result}),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -711,56 +721,111 @@ time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
case {NK0, NV} of case {NK, NV} of
{[undefined], _} -> {[undefined], _} ->
{Data, Res}; {Data, Res};
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
{_, [undefined | _]} -> {_, [undefined | _]} ->
{Data, Res}; {Data, Res};
_ -> _ ->
NK = list_to_binary(NK0), {Data, Res#{
{Data, Res#{NK => value_type(NV)}} list_to_binary(NK) => value_type(NV, tmpl_type(V))
}}
end. end.
value_type([Int, <<"i">>]) when value_type([Int, <<"i">>], mixed) when is_integer(Int) ->
is_integer(Int)
->
{int, Int}; {int, Int};
value_type([UInt, <<"u">>]) when value_type([UInt, <<"u">>], mixed) when is_integer(UInt) ->
is_integer(UInt)
->
{uint, UInt}; {uint, UInt};
%% write `1`, `1.0`, `-1.0` all as float %% write `1`, `1.0`, `-1.0` all as float
%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
value_type([Number]) when is_number(Number) -> value_type([Number], _) when is_number(Number) ->
Number; {float, Number};
value_type([<<"t">>]) -> value_type([<<"t">>], _) ->
't'; 't';
value_type([<<"T">>]) -> value_type([<<"T">>], _) ->
'T'; 'T';
value_type([true]) -> value_type([true], _) ->
'true'; 'true';
value_type([<<"TRUE">>]) -> value_type([<<"TRUE">>], _) ->
'TRUE'; 'TRUE';
value_type([<<"True">>]) -> value_type([<<"True">>], _) ->
'True'; 'True';
value_type([<<"f">>]) -> value_type([<<"f">>], _) ->
'f'; 'f';
value_type([<<"F">>]) -> value_type([<<"F">>], _) ->
'F'; 'F';
value_type([false]) -> value_type([false], _) ->
'false'; 'false';
value_type([<<"FALSE">>]) -> value_type([<<"FALSE">>], _) ->
'FALSE'; 'FALSE';
value_type([<<"False">>]) -> value_type([<<"False">>], _) ->
'False'; 'False';
value_type(Val) -> value_type([Str], variable) when is_binary(Str) ->
Val. Str;
value_type([Str], literal) when is_binary(Str) ->
%% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint.
%% otherwise, we should convert it to float.
NumStr = binary:part(Str, 0, byte_size(Str) - 1),
case binary:part(Str, byte_size(Str), -1) of
<<"i">> ->
maybe_convert_to_integer(NumStr, Str, int);
<<"u">> ->
maybe_convert_to_integer(NumStr, Str, uint);
_ ->
maybe_convert_to_float_str(Str)
end;
value_type(Str, _) ->
list_to_binary(Str).
tmpl_type([{str, _}]) ->
literal;
tmpl_type([{var, _}]) ->
variable;
tmpl_type(_) ->
mixed.
maybe_convert_to_integer(NumStr, String, Type) ->
try
Int = binary_to_integer(NumStr),
{Type, Int}
catch
error:badarg ->
maybe_convert_to_integer_f(NumStr, String, Type)
end.
maybe_convert_to_integer_f(NumStr, String, Type) ->
try
Float = binary_to_float(NumStr),
{Type, erlang:floor(Float)}
catch
error:badarg ->
String
end.
maybe_convert_to_float_str(NumStr) ->
try
_ = binary_to_float(NumStr),
%% NOTE: return a {float, String} to avoid precision loss when converting to float
{float, NumStr}
catch
error:badarg ->
maybe_convert_to_float_str_i(NumStr)
end.
maybe_convert_to_float_str_i(NumStr) ->
try
_ = binary_to_integer(NumStr),
{float, NumStr}
catch
error:badarg ->
NumStr
end.
key_filter(undefined) -> undefined; key_filter(undefined) -> undefined;
key_filter(Value) -> emqx_utils_conv:bin(Value). key_filter(Value) -> bin(Value).
data_filter(undefined) -> undefined; data_filter(undefined) -> undefined;
data_filter(Int) when is_integer(Int) -> Int; data_filter(Int) when is_integer(Int) -> Int;
@ -799,6 +864,10 @@ str(S) when is_list(S) ->
is_unrecoverable_error({error, {unrecoverable_error, _}}) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true; true;
is_unrecoverable_error({error, {Code, _}}) when ?IS_HTTP_ERROR(Code) ->
true;
is_unrecoverable_error({error, {Code, _, _Body}}) when ?IS_HTTP_ERROR(Code) ->
true;
is_unrecoverable_error(_) -> is_unrecoverable_error(_) ->
false. false.

View File

@ -945,6 +945,7 @@ t_create_disconnected(Config) ->
econnrefused -> ok; econnrefused -> ok;
closed -> ok; closed -> ok;
{closed, _} -> ok; {closed, _} -> ok;
{shutdown, closed} -> ok;
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
end, end,
ok ok

View File

@ -0,0 +1 @@
The bridges for InfluxDB have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.

View File

@ -0,0 +1 @@
Fixed issue where using line protocol to write numeric literals into InfluxDB, but the stored values end up being of string type.

View File

@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},