diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index 4228d23d5..4fb7dbae7 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -382,7 +382,7 @@ field(Line) -> field_val([$" | Line]) -> {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []), %% Quoted val can be empty - {Val, strip_l(Line1, ?VAL_SEP)}; + {{quoted, Val}, strip_l(Line1, ?VAL_SEP)}; field_val(Line) -> %% Unquoted value should not be un-escaped according to InfluxDB protocol, %% as it can only hold float, integer, uinteger or boolean value. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 04fbe01c9..478486e5b 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -599,8 +599,17 @@ to_kv_config(KVfields) -> to_maps_config(K, V, Res) -> NK = emqx_placeholder:preproc_tmpl(bin(K)), - NV = emqx_placeholder:preproc_tmpl(bin(V)), - Res#{NK => NV}. + Res#{NK => preproc_quoted(V)}. + +preproc_quoted({quoted, V}) -> + {quoted, emqx_placeholder:preproc_tmpl(bin(V))}; +preproc_quoted(V) -> + emqx_placeholder:preproc_tmpl(bin(V)). + +proc_quoted({quoted, V}, Data, TransOpts) -> + {quoted, emqx_placeholder:proc_tmpl(V, Data, TransOpts)}; +proc_quoted(V, Data, TransOpts) -> + emqx_placeholder:proc_tmpl(V, Data, TransOpts). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans @@ -722,19 +731,23 @@ maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), - NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), + NV = proc_quoted(V, Data, VTransOptions), case {NK, NV} of {[undefined], _} -> {Data, Res}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> {Data, Res}; + {_, {quoted, [undefined | _]}} -> + {Data, Res}; _ -> {Data, Res#{ list_to_binary(NK) => value_type(NV, tmpl_type(V)) }} end. +value_type({quoted, ValList}, _) -> + {string_list, ValList}; value_type([Int, <<"i">>], mixed) when is_integer(Int) -> {int, Int}; value_type([UInt, <<"u">>], mixed) when is_integer(UInt) -> @@ -778,7 +791,7 @@ value_type([Str], literal) when is_binary(Str) -> maybe_convert_to_float_str(Str) end; value_type(Str, _) -> - list_to_binary(Str). + Str. tmpl_type([{str, _}]) -> literal; diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index edb88c72a..ee806d826 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -445,6 +445,7 @@ query_by_clientid(ClientId, Config) -> query => Query, dialect => #{ header => true, + annotations => [<<"datatype">>], delimiter => <<";">> } }), @@ -456,6 +457,7 @@ query_by_clientid(ClientId, Config) -> _Timeout = 10_000, _Retry = 0 ), + %ct:pal("raw body: ~p", [RawBody0]), RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)), {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), DecodedCSV1 = [ @@ -465,21 +467,26 @@ query_by_clientid(ClientId, Config) -> DecodedCSV2 = csv_lines_to_maps(DecodedCSV1), index_by_field(DecodedCSV2). -csv_lines_to_maps([Title | Rest]) -> - csv_lines_to_maps(Rest, Title, _Acc = []); +csv_lines_to_maps([[<<"#datatype">> | DataType], Title | Rest]) -> + csv_lines_to_maps(Rest, Title, _Acc = [], DataType); csv_lines_to_maps([]) -> []. -csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) -> +csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc, DataType) -> + %ct:pal("data: ~p, title: ~p, datatype: ~p", [Data, Title, DataType]), Map = maps:from_list(lists:zip(Title, Data)), - csv_lines_to_maps(RestData, Title, [Map | Acc]); + MapT = lists:zip(Title, DataType), + [Type] = [T || {<<"_value">>, T} <- MapT], + csv_lines_to_maps(RestData, Title, [Map#{'_value_type' => Type} | Acc], DataType); %% ignore the csv title line %% it's always like this: %% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>, %% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement], -csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) -> - csv_lines_to_maps(RestData, Title, Acc); -csv_lines_to_maps([], _Title, Acc) -> +csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc, DataType) -> + csv_lines_to_maps(RestData, Title, Acc, DataType); +csv_lines_to_maps([[<<"#datatype">> | DataType] | RestData], Title, Acc, _) -> + csv_lines_to_maps(RestData, Title, Acc, DataType); +csv_lines_to_maps([], _Title, Acc, _DataType) -> lists:reverse(Acc). index_by_field(DecodedCSV) -> @@ -494,11 +501,21 @@ assert_persisted_data(ClientId, Expected, PersistedData) -> #{<<"_value">> := ExpectedValue}, maps:get(ClientIdIntKey, PersistedData) ); + (Key, {ExpectedValue, ExpectedType}) -> + ?assertMatch( + #{<<"_value">> := ExpectedValue, '_value_type' := ExpectedType}, + maps:get(atom_to_binary(Key), PersistedData), + #{ + key => Key, + expected_value => ExpectedValue, + expected_data_type => ExpectedType + } + ); (Key, ExpectedValue) -> ?assertMatch( #{<<"_value">> := ExpectedValue}, maps:get(atom_to_binary(Key), PersistedData), - #{expected => ExpectedValue} + #{key => Key, expected_value => ExpectedValue} ) end, Expected @@ -689,7 +706,15 @@ t_const_timestamp(Config) -> Config, #{ <<"write_syntax">> => - <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>> + << + "mqtt,clientid=${clientid} " + "foo=${payload.foo}i," + "foo1=${payload.foo}," + "foo2=\"${payload.foo}\"," + "foo3=\"${payload.foo}somestr\"," + "bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ", + ConstBin/binary + >> } ) ), @@ -709,7 +734,18 @@ t_const_timestamp(Config) -> end, ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), - Expected = #{foo => <<"123">>}, + Expected = #{ + foo => {<<"123">>, <<"long">>}, + foo1 => {<<"123">>, <<"double">>}, + foo2 => {<<"123">>, <<"string">>}, + foo3 => {<<"123somestr">>, <<"string">>}, + bar => {<<"5">>, <<"long">>}, + baz0 => {<<"1.1">>, <<"double">>}, + baz1 => {<<"a">>, <<"string">>}, + baz2 => {<<"ai">>, <<"string">>}, + baz3 => {<<"au">>, <<"string">>}, + baz4 => {<<"1u">>, <<"string">>} + }, assert_persisted_data(ClientId, Expected, PersistedData), TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), TimeReturned = pad_zero(TimeReturned0), diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl index 9ad685f77..6ddf42f2f 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl @@ -102,27 +102,51 @@ #{ measurement => "m7", tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}], + fields => [ + {"field", {quoted, "field7"}}, + {"field_a", "field7a"}, + {"field_b", {quoted, "field7b"}} + ], timestamp => undefined }}, {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}", #{ measurement => "m8", tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}], + fields => [ + {"field", {quoted, "field8"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "field8b"}} + ], timestamp => "${timestamp8}" }}, + { + "m8a,tag=tag8,tag_a=\"${tag8a}\",tag_b=tag8b field=\"${field8}\"," + "field_a=field8a,field_b=\"${field8b}\" ${timestamp8}", + #{ + measurement => "m8a", + tags => [{"tag", "tag8"}, {"tag_a", "\"${tag8a}\""}, {"tag_b", "tag8b"}], + fields => [ + {"field", {quoted, "${field8}"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "${field8b}"}} + ], + timestamp => "${timestamp8}" + } + }, {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", #{ measurement => "m9", tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + fields => [ + {"field", {quoted, "field9"}}, {"field_a", "field9a"}, {"field_b", {quoted, ""}} + ], timestamp => "${timestamp9}" }}, {"m10 field=\"\" ${timestamp10}", #{ measurement => "m10", tags => [], - fields => [{"field", ""}], + fields => [{"field", {quoted, ""}}], timestamp => "${timestamp10}" }} ]). @@ -177,19 +201,19 @@ {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{ measurement => "m2", tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], + fields => [{"field", {quoted, "field \"2\",\n"}}], timestamp => undefined }}, {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{ measurement => "m 3", tags => [], - fields => [{"field", "field3"}], + fields => [{"field", {quoted, "field3"}}], timestamp => "${payload.timestamp 3}" }}, {"m4 field=\"\\\"field\\\\4\\\"\"", #{ measurement => "m4", tags => [], - fields => [{"field", "\"field\\4\""}], + fields => [{"field", {quoted, "\"field\\4\""}}], timestamp => undefined }}, { @@ -208,7 +232,11 @@ #{ measurement => "m6", tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + fields => [ + {"field", {quoted, "field6"}}, + {"field_a", {quoted, "field6a"}}, + {"field_b", {quoted, "field6b"}} + ], timestamp => undefined }}, { @@ -217,7 +245,11 @@ #{ measurement => " m7 ", tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}], + fields => [ + {"field", {quoted, "field7"}}, + {"field_a", "field7a"}, + {"field_b", {quoted, "field7b\\\n"}} + ], timestamp => undefined } }, @@ -227,7 +259,11 @@ #{ measurement => "m8", tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}], + fields => [ + {"field", {quoted, "field8"}}, + {"field_a", "field8a"}, + {"field_b", {quoted, "\"field\" = 8b"}} + ], timestamp => "${timestamp8}" } }, @@ -235,14 +271,18 @@ #{ measurement => "m\\9", tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + fields => [ + {"field=field", {quoted, "field9"}}, + {"field_a", "field9a"}, + {"field_b", {quoted, ""}} + ], timestamp => "${timestamp9}" }}, {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{ measurement => "m,10", tags => [], %% backslash should not be un-escaped in tag key - fields => [{"\"field\\\\\"", ""}], + fields => [{"\"field\\\\\"", {quoted, ""}}], timestamp => "${timestamp10}" }} ]). @@ -257,19 +297,19 @@ {" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{ measurement => "m2", tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], + fields => [{"field", {quoted, "field \"2\",\n"}}], timestamp => undefined }}, {" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{ measurement => "m 3", tags => [], - fields => [{"field", "field3"}], + fields => [{"field", {quoted, "field3"}}], timestamp => "${payload.timestamp 3}" }}, {" m4 field=\"\\\"field\\\\4\\\"\" ", #{ measurement => "m4", tags => [], - fields => [{"field", "\"field\\4\""}], + fields => [{"field", {quoted, "\"field\\4\""}}], timestamp => undefined }}, { @@ -288,7 +328,11 @@ #{ measurement => "m6", tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + fields => [ + {"field", {quoted, "field6"}}, + {"field_a", {quoted, "field6a"}}, + {"field_b", {quoted, "field6b"}} + ], timestamp => undefined }} ]).