Compare commits
2 Commits
master
...
influxdb_l
Author | SHA1 | Date |
---|---|---|
![]() |
dd6b214231 | |
![]() |
456a14fe20 |
|
@ -3,7 +3,7 @@
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
|
|
||||||
{deps, [
|
{deps, [
|
||||||
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}},
|
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.13"}}},
|
||||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||||
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||||
|
|
|
@ -382,7 +382,7 @@ field(Line) ->
|
||||||
field_val([$" | Line]) ->
|
field_val([$" | Line]) ->
|
||||||
{Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
|
{Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
|
||||||
%% Quoted val can be empty
|
%% Quoted val can be empty
|
||||||
{Val, strip_l(Line1, ?VAL_SEP)};
|
{{quoted, Val}, strip_l(Line1, ?VAL_SEP)};
|
||||||
field_val(Line) ->
|
field_val(Line) ->
|
||||||
%% Unquoted value should not be un-escaped according to InfluxDB protocol,
|
%% Unquoted value should not be un-escaped according to InfluxDB protocol,
|
||||||
%% as it can only hold float, integer, uinteger or boolean value.
|
%% as it can only hold float, integer, uinteger or boolean value.
|
||||||
|
|
|
@ -59,6 +59,11 @@
|
||||||
|
|
||||||
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
|
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
|
||||||
|
|
||||||
|
-define(IS_HTTP_ERROR(STATUS_CODE),
|
||||||
|
(is_integer(STATUS_CODE) andalso
|
||||||
|
(STATUS_CODE < 200 orelse STATUS_CODE >= 300))
|
||||||
|
).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% resource callback
|
%% resource callback
|
||||||
callback_mode() -> async_if_possible.
|
callback_mode() -> async_if_possible.
|
||||||
|
@ -541,7 +546,12 @@ reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
|
||||||
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
|
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
|
||||||
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
|
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
|
||||||
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
|
||||||
|
reply_callback(ReplyFunAndArgs, {ok, Code, _, Body}) when ?IS_HTTP_ERROR(Code) ->
|
||||||
|
?tp(influxdb_connector_do_query_failure, #{error => Body}),
|
||||||
|
Result = {error, {unrecoverable_error, Body}},
|
||||||
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
|
||||||
reply_callback(ReplyFunAndArgs, Result) ->
|
reply_callback(ReplyFunAndArgs, Result) ->
|
||||||
|
?tp(influxdb_connector_do_query_ok, #{result => Result}),
|
||||||
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
@ -589,8 +599,17 @@ to_kv_config(KVfields) ->
|
||||||
|
|
||||||
to_maps_config(K, V, Res) ->
|
to_maps_config(K, V, Res) ->
|
||||||
NK = emqx_placeholder:preproc_tmpl(bin(K)),
|
NK = emqx_placeholder:preproc_tmpl(bin(K)),
|
||||||
NV = emqx_placeholder:preproc_tmpl(bin(V)),
|
Res#{NK => preproc_quoted(V)}.
|
||||||
Res#{NK => NV}.
|
|
||||||
|
preproc_quoted({quoted, V}) ->
|
||||||
|
{quoted, emqx_placeholder:preproc_tmpl(bin(V))};
|
||||||
|
preproc_quoted(V) ->
|
||||||
|
emqx_placeholder:preproc_tmpl(bin(V)).
|
||||||
|
|
||||||
|
proc_quoted({quoted, V}, Data, TransOpts) ->
|
||||||
|
{quoted, emqx_placeholder:proc_tmpl(V, Data, TransOpts)};
|
||||||
|
proc_quoted(V, Data, TransOpts) ->
|
||||||
|
emqx_placeholder:proc_tmpl(V, Data, TransOpts).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Tags & Fields Data Trans
|
%% Tags & Fields Data Trans
|
||||||
|
@ -711,56 +730,115 @@ time_unit(ns) -> nanosecond.
|
||||||
maps_config_to_data(K, V, {Data, Res}) ->
|
maps_config_to_data(K, V, {Data, Res}) ->
|
||||||
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
|
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
|
||||||
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||||
NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
|
NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
|
||||||
NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
|
NV = proc_quoted(V, Data, VTransOptions),
|
||||||
case {NK0, NV} of
|
case {NK, NV} of
|
||||||
{[undefined], _} ->
|
{[undefined], _} ->
|
||||||
{Data, Res};
|
{Data, Res};
|
||||||
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
|
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
|
||||||
{_, [undefined | _]} ->
|
{_, [undefined | _]} ->
|
||||||
{Data, Res};
|
{Data, Res};
|
||||||
|
{_, {quoted, [undefined | _]}} ->
|
||||||
|
{Data, Res};
|
||||||
_ ->
|
_ ->
|
||||||
NK = list_to_binary(NK0),
|
{Data, Res#{
|
||||||
{Data, Res#{NK => value_type(NV)}}
|
list_to_binary(NK) => value_type(NV, tmpl_type(V))
|
||||||
|
}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
value_type([Int, <<"i">>]) when
|
value_type({quoted, ValList}, _) ->
|
||||||
is_integer(Int)
|
{string_list, ValList};
|
||||||
->
|
value_type([Int, <<"i">>], mixed) when is_integer(Int) ->
|
||||||
{int, Int};
|
{int, Int};
|
||||||
value_type([UInt, <<"u">>]) when
|
value_type([UInt, <<"u">>], mixed) when is_integer(UInt) ->
|
||||||
is_integer(UInt)
|
|
||||||
->
|
|
||||||
{uint, UInt};
|
{uint, UInt};
|
||||||
%% write `1`, `1.0`, `-1.0` all as float
|
%% write `1`, `1.0`, `-1.0` all as float
|
||||||
%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
|
%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
|
||||||
value_type([Number]) when is_number(Number) ->
|
value_type([Number], _) when is_number(Number) ->
|
||||||
Number;
|
{float, Number};
|
||||||
value_type([<<"t">>]) ->
|
value_type([<<"t">>], _) ->
|
||||||
't';
|
't';
|
||||||
value_type([<<"T">>]) ->
|
value_type([<<"T">>], _) ->
|
||||||
'T';
|
'T';
|
||||||
value_type([true]) ->
|
value_type([true], _) ->
|
||||||
'true';
|
'true';
|
||||||
value_type([<<"TRUE">>]) ->
|
value_type([<<"TRUE">>], _) ->
|
||||||
'TRUE';
|
'TRUE';
|
||||||
value_type([<<"True">>]) ->
|
value_type([<<"True">>], _) ->
|
||||||
'True';
|
'True';
|
||||||
value_type([<<"f">>]) ->
|
value_type([<<"f">>], _) ->
|
||||||
'f';
|
'f';
|
||||||
value_type([<<"F">>]) ->
|
value_type([<<"F">>], _) ->
|
||||||
'F';
|
'F';
|
||||||
value_type([false]) ->
|
value_type([false], _) ->
|
||||||
'false';
|
'false';
|
||||||
value_type([<<"FALSE">>]) ->
|
value_type([<<"FALSE">>], _) ->
|
||||||
'FALSE';
|
'FALSE';
|
||||||
value_type([<<"False">>]) ->
|
value_type([<<"False">>], _) ->
|
||||||
'False';
|
'False';
|
||||||
value_type(Val) ->
|
value_type([Str], variable) when is_binary(Str) ->
|
||||||
Val.
|
Str;
|
||||||
|
value_type([Str], literal) when is_binary(Str) ->
|
||||||
|
%% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint.
|
||||||
|
%% otherwise, we should convert it to float.
|
||||||
|
NumStr = binary:part(Str, 0, byte_size(Str) - 1),
|
||||||
|
case binary:part(Str, byte_size(Str), -1) of
|
||||||
|
<<"i">> ->
|
||||||
|
maybe_convert_to_integer(NumStr, Str, int);
|
||||||
|
<<"u">> ->
|
||||||
|
maybe_convert_to_integer(NumStr, Str, uint);
|
||||||
|
_ ->
|
||||||
|
maybe_convert_to_float_str(Str)
|
||||||
|
end;
|
||||||
|
value_type(Str, _) ->
|
||||||
|
Str.
|
||||||
|
|
||||||
|
tmpl_type([{str, _}]) ->
|
||||||
|
literal;
|
||||||
|
tmpl_type([{var, _}]) ->
|
||||||
|
variable;
|
||||||
|
tmpl_type(_) ->
|
||||||
|
mixed.
|
||||||
|
|
||||||
|
maybe_convert_to_integer(NumStr, String, Type) ->
|
||||||
|
try
|
||||||
|
Int = binary_to_integer(NumStr),
|
||||||
|
{Type, Int}
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
maybe_convert_to_integer_f(NumStr, String, Type)
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_convert_to_integer_f(NumStr, String, Type) ->
|
||||||
|
try
|
||||||
|
Float = binary_to_float(NumStr),
|
||||||
|
{Type, erlang:floor(Float)}
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
String
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_convert_to_float_str(NumStr) ->
|
||||||
|
try
|
||||||
|
_ = binary_to_float(NumStr),
|
||||||
|
%% NOTE: return a {float, String} to avoid precision loss when converting to float
|
||||||
|
{float, NumStr}
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
maybe_convert_to_float_str_i(NumStr)
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_convert_to_float_str_i(NumStr) ->
|
||||||
|
try
|
||||||
|
_ = binary_to_integer(NumStr),
|
||||||
|
{float, NumStr}
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
NumStr
|
||||||
|
end.
|
||||||
|
|
||||||
key_filter(undefined) -> undefined;
|
key_filter(undefined) -> undefined;
|
||||||
key_filter(Value) -> emqx_utils_conv:bin(Value).
|
key_filter(Value) -> bin(Value).
|
||||||
|
|
||||||
data_filter(undefined) -> undefined;
|
data_filter(undefined) -> undefined;
|
||||||
data_filter(Int) when is_integer(Int) -> Int;
|
data_filter(Int) when is_integer(Int) -> Int;
|
||||||
|
@ -799,6 +877,10 @@ str(S) when is_list(S) ->
|
||||||
|
|
||||||
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
|
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
|
||||||
true;
|
true;
|
||||||
|
is_unrecoverable_error({error, {Code, _}}) when ?IS_HTTP_ERROR(Code) ->
|
||||||
|
true;
|
||||||
|
is_unrecoverable_error({error, {Code, _, _Body}}) when ?IS_HTTP_ERROR(Code) ->
|
||||||
|
true;
|
||||||
is_unrecoverable_error(_) ->
|
is_unrecoverable_error(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
|
|
@ -445,6 +445,7 @@ query_by_clientid(ClientId, Config) ->
|
||||||
query => Query,
|
query => Query,
|
||||||
dialect => #{
|
dialect => #{
|
||||||
header => true,
|
header => true,
|
||||||
|
annotations => [<<"datatype">>],
|
||||||
delimiter => <<";">>
|
delimiter => <<";">>
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
@ -456,6 +457,7 @@ query_by_clientid(ClientId, Config) ->
|
||||||
_Timeout = 10_000,
|
_Timeout = 10_000,
|
||||||
_Retry = 0
|
_Retry = 0
|
||||||
),
|
),
|
||||||
|
%ct:pal("raw body: ~p", [RawBody0]),
|
||||||
RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)),
|
RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)),
|
||||||
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
|
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
|
||||||
DecodedCSV1 = [
|
DecodedCSV1 = [
|
||||||
|
@ -465,21 +467,26 @@ query_by_clientid(ClientId, Config) ->
|
||||||
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
|
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
|
||||||
index_by_field(DecodedCSV2).
|
index_by_field(DecodedCSV2).
|
||||||
|
|
||||||
csv_lines_to_maps([Title | Rest]) ->
|
csv_lines_to_maps([[<<"#datatype">> | DataType], Title | Rest]) ->
|
||||||
csv_lines_to_maps(Rest, Title, _Acc = []);
|
csv_lines_to_maps(Rest, Title, _Acc = [], DataType);
|
||||||
csv_lines_to_maps([]) ->
|
csv_lines_to_maps([]) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
|
csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc, DataType) ->
|
||||||
|
%ct:pal("data: ~p, title: ~p, datatype: ~p", [Data, Title, DataType]),
|
||||||
Map = maps:from_list(lists:zip(Title, Data)),
|
Map = maps:from_list(lists:zip(Title, Data)),
|
||||||
csv_lines_to_maps(RestData, Title, [Map | Acc]);
|
MapT = lists:zip(Title, DataType),
|
||||||
|
[Type] = [T || {<<"_value">>, T} <- MapT],
|
||||||
|
csv_lines_to_maps(RestData, Title, [Map#{'_value_type' => Type} | Acc], DataType);
|
||||||
%% ignore the csv title line
|
%% ignore the csv title line
|
||||||
%% it's always like this:
|
%% it's always like this:
|
||||||
%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
|
%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
|
||||||
%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
|
%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
|
||||||
csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
|
csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc, DataType) ->
|
||||||
csv_lines_to_maps(RestData, Title, Acc);
|
csv_lines_to_maps(RestData, Title, Acc, DataType);
|
||||||
csv_lines_to_maps([], _Title, Acc) ->
|
csv_lines_to_maps([[<<"#datatype">> | DataType] | RestData], Title, Acc, _) ->
|
||||||
|
csv_lines_to_maps(RestData, Title, Acc, DataType);
|
||||||
|
csv_lines_to_maps([], _Title, Acc, _DataType) ->
|
||||||
lists:reverse(Acc).
|
lists:reverse(Acc).
|
||||||
|
|
||||||
index_by_field(DecodedCSV) ->
|
index_by_field(DecodedCSV) ->
|
||||||
|
@ -494,11 +501,21 @@ assert_persisted_data(ClientId, Expected, PersistedData) ->
|
||||||
#{<<"_value">> := ExpectedValue},
|
#{<<"_value">> := ExpectedValue},
|
||||||
maps:get(ClientIdIntKey, PersistedData)
|
maps:get(ClientIdIntKey, PersistedData)
|
||||||
);
|
);
|
||||||
|
(Key, {ExpectedValue, ExpectedType}) ->
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"_value">> := ExpectedValue, '_value_type' := ExpectedType},
|
||||||
|
maps:get(atom_to_binary(Key), PersistedData),
|
||||||
|
#{
|
||||||
|
key => Key,
|
||||||
|
expected_value => ExpectedValue,
|
||||||
|
expected_data_type => ExpectedType
|
||||||
|
}
|
||||||
|
);
|
||||||
(Key, ExpectedValue) ->
|
(Key, ExpectedValue) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{<<"_value">> := ExpectedValue},
|
#{<<"_value">> := ExpectedValue},
|
||||||
maps:get(atom_to_binary(Key), PersistedData),
|
maps:get(atom_to_binary(Key), PersistedData),
|
||||||
#{expected => ExpectedValue}
|
#{key => Key, expected_value => ExpectedValue}
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
Expected
|
Expected
|
||||||
|
@ -689,7 +706,15 @@ t_const_timestamp(Config) ->
|
||||||
Config,
|
Config,
|
||||||
#{
|
#{
|
||||||
<<"write_syntax">> =>
|
<<"write_syntax">> =>
|
||||||
<<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
|
<<
|
||||||
|
"mqtt,clientid=${clientid} "
|
||||||
|
"foo=${payload.foo}i,"
|
||||||
|
"foo1=${payload.foo},"
|
||||||
|
"foo2=\"${payload.foo}\","
|
||||||
|
"foo3=\"${payload.foo}somestr\","
|
||||||
|
"bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ",
|
||||||
|
ConstBin/binary
|
||||||
|
>>
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
@ -709,7 +734,18 @@ t_const_timestamp(Config) ->
|
||||||
end,
|
end,
|
||||||
ct:sleep(1500),
|
ct:sleep(1500),
|
||||||
PersistedData = query_by_clientid(ClientId, Config),
|
PersistedData = query_by_clientid(ClientId, Config),
|
||||||
Expected = #{foo => <<"123">>},
|
Expected = #{
|
||||||
|
foo => {<<"123">>, <<"long">>},
|
||||||
|
foo1 => {<<"123">>, <<"double">>},
|
||||||
|
foo2 => {<<"123">>, <<"string">>},
|
||||||
|
foo3 => {<<"123somestr">>, <<"string">>},
|
||||||
|
bar => {<<"5">>, <<"long">>},
|
||||||
|
baz0 => {<<"1.1">>, <<"double">>},
|
||||||
|
baz1 => {<<"a">>, <<"string">>},
|
||||||
|
baz2 => {<<"ai">>, <<"string">>},
|
||||||
|
baz3 => {<<"au">>, <<"string">>},
|
||||||
|
baz4 => {<<"1u">>, <<"string">>}
|
||||||
|
},
|
||||||
assert_persisted_data(ClientId, Expected, PersistedData),
|
assert_persisted_data(ClientId, Expected, PersistedData),
|
||||||
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
|
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
|
||||||
TimeReturned = pad_zero(TimeReturned0),
|
TimeReturned = pad_zero(TimeReturned0),
|
||||||
|
@ -945,6 +981,7 @@ t_create_disconnected(Config) ->
|
||||||
econnrefused -> ok;
|
econnrefused -> ok;
|
||||||
closed -> ok;
|
closed -> ok;
|
||||||
{closed, _} -> ok;
|
{closed, _} -> ok;
|
||||||
|
{shutdown, closed} -> ok;
|
||||||
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
||||||
end,
|
end,
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -102,27 +102,51 @@
|
||||||
#{
|
#{
|
||||||
measurement => "m7",
|
measurement => "m7",
|
||||||
tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
|
tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
|
||||||
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field7"}},
|
||||||
|
{"field_a", "field7a"},
|
||||||
|
{"field_b", {quoted, "field7b"}}
|
||||||
|
],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
|
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
|
||||||
#{
|
#{
|
||||||
measurement => "m8",
|
measurement => "m8",
|
||||||
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
||||||
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field8"}},
|
||||||
|
{"field_a", "field8a"},
|
||||||
|
{"field_b", {quoted, "field8b"}}
|
||||||
|
],
|
||||||
timestamp => "${timestamp8}"
|
timestamp => "${timestamp8}"
|
||||||
}},
|
}},
|
||||||
|
{
|
||||||
|
"m8a,tag=tag8,tag_a=\"${tag8a}\",tag_b=tag8b field=\"${field8}\","
|
||||||
|
"field_a=field8a,field_b=\"${field8b}\" ${timestamp8}",
|
||||||
|
#{
|
||||||
|
measurement => "m8a",
|
||||||
|
tags => [{"tag", "tag8"}, {"tag_a", "\"${tag8a}\""}, {"tag_b", "tag8b"}],
|
||||||
|
fields => [
|
||||||
|
{"field", {quoted, "${field8}"}},
|
||||||
|
{"field_a", "field8a"},
|
||||||
|
{"field_b", {quoted, "${field8b}"}}
|
||||||
|
],
|
||||||
|
timestamp => "${timestamp8}"
|
||||||
|
}
|
||||||
|
},
|
||||||
{"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
|
{"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
|
||||||
#{
|
#{
|
||||||
measurement => "m9",
|
measurement => "m9",
|
||||||
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
|
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
|
||||||
fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
|
fields => [
|
||||||
|
{"field", {quoted, "field9"}}, {"field_a", "field9a"}, {"field_b", {quoted, ""}}
|
||||||
|
],
|
||||||
timestamp => "${timestamp9}"
|
timestamp => "${timestamp9}"
|
||||||
}},
|
}},
|
||||||
{"m10 field=\"\" ${timestamp10}", #{
|
{"m10 field=\"\" ${timestamp10}", #{
|
||||||
measurement => "m10",
|
measurement => "m10",
|
||||||
tags => [],
|
tags => [],
|
||||||
fields => [{"field", ""}],
|
fields => [{"field", {quoted, ""}}],
|
||||||
timestamp => "${timestamp10}"
|
timestamp => "${timestamp10}"
|
||||||
}}
|
}}
|
||||||
]).
|
]).
|
||||||
|
@ -177,19 +201,19 @@
|
||||||
{"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
|
{"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
|
||||||
measurement => "m2",
|
measurement => "m2",
|
||||||
tags => [{"tag", "tag2"}],
|
tags => [{"tag", "tag2"}],
|
||||||
fields => [{"field", "field \"2\",\n"}],
|
fields => [{"field", {quoted, "field \"2\",\n"}}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
|
{"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
|
||||||
measurement => "m 3",
|
measurement => "m 3",
|
||||||
tags => [],
|
tags => [],
|
||||||
fields => [{"field", "field3"}],
|
fields => [{"field", {quoted, "field3"}}],
|
||||||
timestamp => "${payload.timestamp 3}"
|
timestamp => "${payload.timestamp 3}"
|
||||||
}},
|
}},
|
||||||
{"m4 field=\"\\\"field\\\\4\\\"\"", #{
|
{"m4 field=\"\\\"field\\\\4\\\"\"", #{
|
||||||
measurement => "m4",
|
measurement => "m4",
|
||||||
tags => [],
|
tags => [],
|
||||||
fields => [{"field", "\"field\\4\""}],
|
fields => [{"field", {quoted, "\"field\\4\""}}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{
|
{
|
||||||
|
@ -208,7 +232,11 @@
|
||||||
#{
|
#{
|
||||||
measurement => "m6",
|
measurement => "m6",
|
||||||
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
|
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
|
||||||
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field6"}},
|
||||||
|
{"field_a", {quoted, "field6a"}},
|
||||||
|
{"field_b", {quoted, "field6b"}}
|
||||||
|
],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{
|
{
|
||||||
|
@ -217,7 +245,11 @@
|
||||||
#{
|
#{
|
||||||
measurement => " m7 ",
|
measurement => " m7 ",
|
||||||
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
|
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
|
||||||
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field7"}},
|
||||||
|
{"field_a", "field7a"},
|
||||||
|
{"field_b", {quoted, "field7b\\\n"}}
|
||||||
|
],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -227,7 +259,11 @@
|
||||||
#{
|
#{
|
||||||
measurement => "m8",
|
measurement => "m8",
|
||||||
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
||||||
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field8"}},
|
||||||
|
{"field_a", "field8a"},
|
||||||
|
{"field_b", {quoted, "\"field\" = 8b"}}
|
||||||
|
],
|
||||||
timestamp => "${timestamp8}"
|
timestamp => "${timestamp8}"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -235,14 +271,18 @@
|
||||||
#{
|
#{
|
||||||
measurement => "m\\9",
|
measurement => "m\\9",
|
||||||
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
|
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
|
||||||
fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
|
fields => [
|
||||||
|
{"field=field", {quoted, "field9"}},
|
||||||
|
{"field_a", "field9a"},
|
||||||
|
{"field_b", {quoted, ""}}
|
||||||
|
],
|
||||||
timestamp => "${timestamp9}"
|
timestamp => "${timestamp9}"
|
||||||
}},
|
}},
|
||||||
{"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
|
{"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
|
||||||
measurement => "m,10",
|
measurement => "m,10",
|
||||||
tags => [],
|
tags => [],
|
||||||
%% backslash should not be un-escaped in tag key
|
%% backslash should not be un-escaped in tag key
|
||||||
fields => [{"\"field\\\\\"", ""}],
|
fields => [{"\"field\\\\\"", {quoted, ""}}],
|
||||||
timestamp => "${timestamp10}"
|
timestamp => "${timestamp10}"
|
||||||
}}
|
}}
|
||||||
]).
|
]).
|
||||||
|
@ -257,19 +297,19 @@
|
||||||
{" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{
|
{" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{
|
||||||
measurement => "m2",
|
measurement => "m2",
|
||||||
tags => [{"tag", "tag2"}],
|
tags => [{"tag", "tag2"}],
|
||||||
fields => [{"field", "field \"2\",\n"}],
|
fields => [{"field", {quoted, "field \"2\",\n"}}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{
|
{" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{
|
||||||
measurement => "m 3",
|
measurement => "m 3",
|
||||||
tags => [],
|
tags => [],
|
||||||
fields => [{"field", "field3"}],
|
fields => [{"field", {quoted, "field3"}}],
|
||||||
timestamp => "${payload.timestamp 3}"
|
timestamp => "${payload.timestamp 3}"
|
||||||
}},
|
}},
|
||||||
{" m4 field=\"\\\"field\\\\4\\\"\" ", #{
|
{" m4 field=\"\\\"field\\\\4\\\"\" ", #{
|
||||||
measurement => "m4",
|
measurement => "m4",
|
||||||
tags => [],
|
tags => [],
|
||||||
fields => [{"field", "\"field\\4\""}],
|
fields => [{"field", {quoted, "\"field\\4\""}}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{
|
{
|
||||||
|
@ -288,7 +328,11 @@
|
||||||
#{
|
#{
|
||||||
measurement => "m6",
|
measurement => "m6",
|
||||||
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
|
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
|
||||||
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
|
fields => [
|
||||||
|
{"field", {quoted, "field6"}},
|
||||||
|
{"field_a", {quoted, "field6a"}},
|
||||||
|
{"field_b", {quoted, "field6b"}}
|
||||||
|
],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}}
|
}}
|
||||||
]).
|
]).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
The bridges for InfluxDB have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed issue where using line protocol to write numeric literals into InfluxDB, but the stored values end up being of string type.
|
2
mix.exs
2
mix.exs
|
@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
|
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
|
||||||
[
|
[
|
||||||
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
|
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
|
||||||
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true},
|
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
|
||||||
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
|
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
|
||||||
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
|
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
|
||||||
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
|
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
|
||||||
|
|
Loading…
Reference in New Issue