diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 1fe5b4f78..be5ed6b1c 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -638,8 +638,10 @@ value_type([UInt, <<"u">>]) when is_integer(UInt) -> {uint, UInt}; -value_type([Float]) when is_float(Float) -> - Float; +%% write `1`, `1.0`, `-1.0` all as float +%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float +value_type([Number]) when is_number(Number) -> + Number; value_type([<<"t">>]) -> 't'; value_type([<<"T">>]) -> diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index f97e5e977..3976d187a 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -454,24 +454,26 @@ query_by_clientid(ClientId, Config) -> {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), DecodedCSV1 = [ [Field || Field <- Line, Field =/= <<>>] - || Line <- DecodedCSV0, - Line =/= [<<>>] + || Line <- DecodedCSV0, Line =/= [<<>>] ], - DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []), + DecodedCSV2 = csv_lines_to_maps(DecodedCSV1), index_by_field(DecodedCSV2). -decode_csv(RawBody) -> - Lines = - [ - binary:split(Line, [<<";">>], [global, trim_all]) - || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all]) - ], - csv_lines_to_maps(Lines, []). +csv_lines_to_maps([Title | Rest]) -> + csv_lines_to_maps(Rest, Title, _Acc = []); +csv_lines_to_maps([]) -> + []. -csv_lines_to_maps([Fields, Data | Rest], Acc) -> - Map = maps:from_list(lists:zip(Fields, Data)), - csv_lines_to_maps(Rest, [Map | Acc]); -csv_lines_to_maps(_Data, Acc) -> +csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) -> + Map = maps:from_list(lists:zip(Title, Data)), + csv_lines_to_maps(RestData, Title, [Map | Acc]); +%% ignore the csv title line +%% it's always like this: +%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>, +%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement], +csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) -> + csv_lines_to_maps(RestData, Title, Acc); +csv_lines_to_maps([], _Title, Acc) -> lists:reverse(Acc). index_by_field(DecodedCSV) -> @@ -768,6 +770,53 @@ t_boolean_variants(Config) -> ), ok. +t_any_num_as_float(Config) -> + QueryMode = ?config(query_mode, Config), + Const = erlang:system_time(nanosecond), + ConstBin = integer_to_binary(Const), + TsStr = iolist_to_binary( + calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}]) + ), + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"write_syntax">> => + <<"mqtt,clientid=${clientid}", " ", + "float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ", + ConstBin/binary>> + } + ) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + %% no decimal point + float_no_dp => 123, + %% with decimal point + float_dp => 123.0 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) + }, + case QueryMode of + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)), + ok; + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500) + end, + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>}, + assert_persisted_data(ClientId, Expected, PersistedData), + TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"float_no_dp">>, PersistedData)), + TimeReturned = pad_zero(TimeReturned0), + ?assertEqual(TsStr, TimeReturned). + t_bad_timestamp(Config) -> InfluxDBType = ?config(influxdb_type, Config), InfluxDBName = ?config(influxdb_name, Config), diff --git a/changes/ee/fix-11223.en.md b/changes/ee/fix-11223.en.md new file mode 100644 index 000000000..6d97746be --- /dev/null +++ b/changes/ee/fix-11223.en.md @@ -0,0 +1,5 @@ +In InfluxDB bridging, if intend to write using the float data type but the placeholder represents the original value +as an integer without a decimal point during serialization, it will result in the failure of Influx Line Protocol serialization +and the inability to write to the InfluxDB bridge. + +See also: [InfluxDB v2.7 Line-Protocol](https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float)