diff --git a/apps/emqx_bridge_influxdb/rebar.config b/apps/emqx_bridge_influxdb/rebar.config index c6ad26ac1..80d3f8dec 100644 --- a/apps/emqx_bridge_influxdb/rebar.config +++ b/apps/emqx_bridge_influxdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.13"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index 4228d23d5..4fb7dbae7 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -382,7 +382,7 @@ field(Line) -> field_val([$" | Line]) -> {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []), %% Quoted val can be empty - {Val, strip_l(Line1, ?VAL_SEP)}; + {{quoted, Val}, strip_l(Line1, ?VAL_SEP)}; field_val(Line) -> %% Unquoted value should not be un-escaped according to InfluxDB protocol, %% as it can only hold float, integer, uinteger or boolean value. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 7a84bc440..478486e5b 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -59,6 +59,11 @@ -define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). +-define(IS_HTTP_ERROR(STATUS_CODE), + (is_integer(STATUS_CODE) andalso + (STATUS_CODE < 200 orelse STATUS_CODE >= 300)) +). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -541,7 +546,12 @@ reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) -> ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}), Result = {error, {unrecoverable_error, <<"authorization failure">>}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +reply_callback(ReplyFunAndArgs, {ok, Code, _, Body}) when ?IS_HTTP_ERROR(Code) -> + ?tp(influxdb_connector_do_query_failure, #{error => Body}), + Result = {error, {unrecoverable_error, Body}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); reply_callback(ReplyFunAndArgs, Result) -> + ?tp(influxdb_connector_do_query_ok, #{result => Result}), emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). %% ------------------------------------------------------------------------------------------------- @@ -589,8 +599,17 @@ to_kv_config(KVfields) -> to_maps_config(K, V, Res) -> NK = emqx_placeholder:preproc_tmpl(bin(K)), - NV = emqx_placeholder:preproc_tmpl(bin(V)), - Res#{NK => NV}. + Res#{NK => preproc_quoted(V)}. + +preproc_quoted({quoted, V}) -> + {quoted, emqx_placeholder:preproc_tmpl(bin(V))}; +preproc_quoted(V) -> + emqx_placeholder:preproc_tmpl(bin(V)). + +proc_quoted({quoted, V}, Data, TransOpts) -> + {quoted, emqx_placeholder:proc_tmpl(V, Data, TransOpts)}; +proc_quoted(V, Data, TransOpts) -> + emqx_placeholder:proc_tmpl(V, Data, TransOpts). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans @@ -711,56 +730,115 @@ time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), - NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), - case {NK0, NV} of + NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NV = proc_quoted(V, Data, VTransOptions), + case {NK, NV} of {[undefined], _} -> {Data, Res}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> {Data, Res}; + {_, {quoted, [undefined | _]}} -> + {Data, Res}; _ -> - NK = list_to_binary(NK0), - {Data, Res#{NK => value_type(NV)}} + {Data, Res#{ + list_to_binary(NK) => value_type(NV, tmpl_type(V)) + }} end. -value_type([Int, <<"i">>]) when - is_integer(Int) --> +value_type({quoted, ValList}, _) -> + {string_list, ValList}; +value_type([Int, <<"i">>], mixed) when is_integer(Int) -> {int, Int}; -value_type([UInt, <<"u">>]) when - is_integer(UInt) --> +value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> {uint, UInt}; %% write `1`, `1.0`, `-1.0` all as float %% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float -value_type([Number]) when is_number(Number) -> - Number; -value_type([<<"t">>]) -> +value_type([Number], _) when is_number(Number) -> + {float, Number}; +value_type([<<"t">>], _) -> 't'; -value_type([<<"T">>]) -> +value_type([<<"T">>], _) -> 'T'; -value_type([true]) -> +value_type([true], _) -> 'true'; -value_type([<<"TRUE">>]) -> +value_type([<<"TRUE">>], _) -> 'TRUE'; -value_type([<<"True">>]) -> +value_type([<<"True">>], _) -> 'True'; -value_type([<<"f">>]) -> +value_type([<<"f">>], _) -> 'f'; -value_type([<<"F">>]) -> +value_type([<<"F">>], _) -> 'F'; -value_type([false]) -> +value_type([false], _) -> 'false'; -value_type([<<"FALSE">>]) -> +value_type([<<"FALSE">>], _) -> 'FALSE'; -value_type([<<"False">>]) -> +value_type([<<"False">>], _) -> 'False'; -value_type(Val) -> - Val. +value_type([Str], variable) when is_binary(Str) -> + Str; +value_type([Str], literal) when is_binary(Str) -> + %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint. + %% otherwise, we should convert it to float. + NumStr = binary:part(Str, 0, byte_size(Str) - 1), + case binary:part(Str, byte_size(Str), -1) of + <<"i">> -> + maybe_convert_to_integer(NumStr, Str, int); + <<"u">> -> + maybe_convert_to_integer(NumStr, Str, uint); + _ -> + maybe_convert_to_float_str(Str) + end; +value_type(Str, _) -> + Str. + +tmpl_type([{str, _}]) -> + literal; +tmpl_type([{var, _}]) -> + variable; +tmpl_type(_) -> + mixed. + +maybe_convert_to_integer(NumStr, String, Type) -> + try + Int = binary_to_integer(NumStr), + {Type, Int} + catch + error:badarg -> + maybe_convert_to_integer_f(NumStr, String, Type) + end. + +maybe_convert_to_integer_f(NumStr, String, Type) -> + try + Float = binary_to_float(NumStr), + {Type, erlang:floor(Float)} + catch + error:badarg -> + String + end. + +maybe_convert_to_float_str(NumStr) -> + try + _ = binary_to_float(NumStr), + %% NOTE: return a {float, String} to avoid precision loss when converting to float + {float, NumStr} + catch + error:badarg -> + maybe_convert_to_float_str_i(NumStr) + end. + +maybe_convert_to_float_str_i(NumStr) -> + try + _ = binary_to_integer(NumStr), + {float, NumStr} + catch + error:badarg -> + NumStr + end. key_filter(undefined) -> undefined; -key_filter(Value) -> emqx_utils_conv:bin(Value). +key_filter(Value) -> bin(Value). data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; @@ -799,6 +877,10 @@ str(S) when is_list(S) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; +is_unrecoverable_error({error, {Code, _}}) when ?IS_HTTP_ERROR(Code) -> + true; +is_unrecoverable_error({error, {Code, _, _Body}}) when ?IS_HTTP_ERROR(Code) -> + true; is_unrecoverable_error(_) -> false. diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index d79139f17..ee806d826 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -445,6 +445,7 @@ query_by_clientid(ClientId, Config) -> query => Query, dialect => #{ header => true, + annotations => [<<"datatype">>], delimiter => <<";">> } }), @@ -456,6 +457,7 @@ query_by_clientid(ClientId, Config) -> _Timeout = 10_000, _Retry = 0 ), + %ct:pal("raw body: ~p", [RawBody0]), RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)), {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), DecodedCSV1 = [ @@ -465,21 +467,26 @@ query_by_clientid(ClientId, Config) -> DecodedCSV2 = csv_lines_to_maps(DecodedCSV1), index_by_field(DecodedCSV2). -csv_lines_to_maps([Title | Rest]) -> - csv_lines_to_maps(Rest, Title, _Acc = []); +csv_lines_to_maps([[<<"#datatype">> | DataType], Title | Rest]) -> + csv_lines_to_maps(Rest, Title, _Acc = [], DataType); csv_lines_to_maps([]) -> []. -csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) -> +csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc, DataType) -> + %ct:pal("data: ~p, title: ~p, datatype: ~p", [Data, Title, DataType]), Map = maps:from_list(lists:zip(Title, Data)), - csv_lines_to_maps(RestData, Title, [Map | Acc]); + MapT = lists:zip(Title, DataType), + [Type] = [T || {<<"_value">>, T} <- MapT], + csv_lines_to_maps(RestData, Title, [Map#{'_value_type' => Type} | Acc], DataType); %% ignore the csv title line %% it's always like this: %% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>, %% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement], -csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) -> - csv_lines_to_maps(RestData, Title, Acc); -csv_lines_to_maps([], _Title, Acc) -> +csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc, DataType) -> + csv_lines_to_maps(RestData, Title, Acc, DataType); +csv_lines_to_maps([[<<"#datatype">> | DataType] | RestData], Title, Acc, _) -> + csv_lines_to_maps(RestData, Title, Acc, DataType); +csv_lines_to_maps([], _Title, Acc, _DataType) -> lists:reverse(Acc). index_by_field(DecodedCSV) -> @@ -494,11 +501,21 @@ assert_persisted_data(ClientId, Expected, PersistedData) -> #{<<"_value">> := ExpectedValue}, maps:get(ClientIdIntKey, PersistedData) ); + (Key, {ExpectedValue, ExpectedType}) -> + ?assertMatch( + #{<<"_value">> := ExpectedValue, '_value_type' := ExpectedType}, + maps:get(atom_to_binary(Key), PersistedData), + #{ + key => Key, + expected_value => ExpectedValue, + expected_data_type => ExpectedType + } + ); (Key, ExpectedValue) -> ?assertMatch( #{<<"_value">> := ExpectedValue}, maps:get(atom_to_binary(Key), PersistedData), - #{expected => ExpectedValue} + #{key => Key, expected_value => ExpectedValue} ) end, Expected @@ -689,7 +706,15 @@ t_const_timestamp(Config) -> Config, #{ <<"write_syntax">> => - <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>> + << + "mqtt,clientid=${clientid} " + "foo=${payload.foo}i," + "foo1=${payload.foo}," + "foo2=\"${payload.foo}\"," + "foo3=\"${payload.foo}somestr\"," + "bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ", + ConstBin/binary + >> } ) ), @@ -709,7 +734,18 @@ t_const_timestamp(Config) -> end, ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), - Expected = #{foo => <<"123">>}, + Expected = #{ + foo => {<<"123">>, <<"long">>}, + foo1 => {<<"123">>, <<"double">>}, + foo2 => {<<"123">>, <<"string">>}, + foo3 => {<<"123somestr">>, <<"string">>}, + bar => {<<"5">>, <<"long">>}, + baz0 => {<<"1.1">>, <<"double">>}, + baz1 => {<<"a">>, <<"string">>}, + baz2 => {<<"ai">>, <<"string">>}, + baz3 => {<<"au">>, <<"string">>}, + baz4 => {<<"1u">>, <<"string">>} + }, assert_persisted_data(ClientId, Expected, PersistedData), TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), TimeReturned = pad_zero(TimeReturned0), @@ -945,6 +981,7 @@ t_create_disconnected(Config) -> econnrefused -> ok; closed -> ok; {closed, _} -> ok; + {shutdown, closed} -> ok; _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) end, ok diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl index 9ad685f77..6ddf42f2f 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl @@ -102,27 +102,51 @@ #{ measurement => "m7", tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}], + fields => [ + {"field", {quoted, "field7"}}, + {"field_a", "field7a"}, + {"field_b", {quoted, "field7b"}} + ], timestamp => undefined }}, {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}", #{ measurement => "m8", tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}], + fields => [ + {"field", {quoted, "field8"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "field8b"}} + ], timestamp => "${timestamp8}" }}, + { + "m8a,tag=tag8,tag_a=\"${tag8a}\",tag_b=tag8b field=\"${field8}\"," + "field_a=field8a,field_b=\"${field8b}\" ${timestamp8}", + #{ + measurement => "m8a", + tags => [{"tag", "tag8"}, {"tag_a", "\"${tag8a}\""}, {"tag_b", "tag8b"}], + fields => [ + {"field", {quoted, "${field8}"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "${field8b}"}} + ], + timestamp => "${timestamp8}" + } + }, {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", #{ measurement => "m9", tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + fields => [ + {"field", {quoted, "field9"}}, {"field_a", "field9a"}, {"field_b", {quoted, ""}} + ], timestamp => "${timestamp9}" }}, {"m10 field=\"\" ${timestamp10}", #{ measurement => "m10", tags => [], - fields => [{"field", ""}], + fields => [{"field", {quoted, ""}}], timestamp => "${timestamp10}" }} ]). @@ -177,19 +201,19 @@ {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{ measurement => "m2", tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], + fields => [{"field", {quoted, "field \"2\",\n"}}], timestamp => undefined }}, {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{ measurement => "m 3", tags => [], - fields => [{"field", "field3"}], + fields => [{"field", {quoted, "field3"}}], timestamp => "${payload.timestamp 3}" }}, {"m4 field=\"\\\"field\\\\4\\\"\"", #{ measurement => "m4", tags => [], - fields => [{"field", "\"field\\4\""}], + fields => [{"field", {quoted, "\"field\\4\""}}], timestamp => undefined }}, { @@ -208,7 +232,11 @@ #{ measurement => "m6", tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + fields => [ + {"field", {quoted, "field6"}}, + {"field_a", {quoted, "field6a"}}, + {"field_b", {quoted, "field6b"}} + ], timestamp => undefined }}, { @@ -217,7 +245,11 @@ #{ measurement => " m7 ", tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}], + fields => [ + {"field", {quoted, "field7"}}, + {"field_a", "field7a"}, + {"field_b", {quoted, "field7b\\\n"}} + ], timestamp => undefined } }, @@ -227,7 +259,11 @@ #{ measurement => "m8", tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}], + fields => [ + {"field", {quoted, "field8"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "\"field\" = 8b"}} + ], timestamp => "${timestamp8}" } }, @@ -235,14 +271,18 @@ #{ measurement => "m\\9", tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + fields => [ + {"field=field", {quoted, "field9"}}, + {"field_a", "field9a"}, + {"field_b", {quoted, ""}} + ], timestamp => "${timestamp9}" }}, {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{ measurement => "m,10", tags => [], %% backslash should not be un-escaped in tag key - fields => [{"\"field\\\\\"", ""}], + fields => [{"\"field\\\\\"", {quoted, ""}}], timestamp => "${timestamp10}" }} ]). @@ -257,19 +297,19 @@ {" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{ measurement => "m2", tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], + fields => [{"field", {quoted, "field \"2\",\n"}}], timestamp => undefined }}, {" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{ measurement => "m 3", tags => [], - fields => [{"field", "field3"}], + fields => [{"field", {quoted, "field3"}}], timestamp => "${payload.timestamp 3}" }}, {" m4 field=\"\\\"field\\\\4\\\"\" ", #{ measurement => "m4", tags => [], - fields => [{"field", "\"field\\4\""}], + fields => [{"field", {quoted, "\"field\\4\""}}], timestamp => undefined }}, { @@ -288,7 +328,11 @@ #{ measurement => "m6", tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + fields => [ + {"field", {quoted, "field6"}}, + {"field_a", {quoted, "field6a"}}, + {"field_b", {quoted, "field6b"}} + ], timestamp => undefined }} ]). diff --git a/changes/ee/feat-12247.en.md b/changes/ee/feat-12247.en.md new file mode 100644 index 000000000..783e8e382 --- /dev/null +++ b/changes/ee/feat-12247.en.md @@ -0,0 +1 @@ +The bridges for InfluxDB have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/changes/ee/fix-12301.en.md b/changes/ee/fix-12301.en.md new file mode 100644 index 000000000..dde764015 --- /dev/null +++ b/changes/ee/fix-12301.en.md @@ -0,0 +1 @@ +Fixed issue where using line protocol to write numeric literals into InfluxDB, but the stored values end up being of string type. diff --git a/mix.exs b/mix.exs index de8195cfb..13d95ccb5 100644 --- a/mix.exs +++ b/mix.exs @@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},