Merge pull request #9734 from terry-xiaoyu/influxdb-default-value-of-write-syntax

refactor: remove default value of timestamp field of influxdb
This commit is contained in:
Xinyu Liu 2023-01-13 10:14:14 +08:00 committed by GitHub
commit 6b75077c47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 88 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugin_libs, [ {application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"}, {description, "EMQX Plugin utility libs"},
{vsn, "4.3.4"}, {vsn, "4.3.5"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []} {env, []}

View File

@ -63,6 +63,8 @@
can_topic_match_oneof/2 can_topic_match_oneof/2
]). ]).
-export_type([tmpl_token/0]).
-compile({no_auto_import, [float/1]}). -compile({no_auto_import, [float/1]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").

View File

@ -196,33 +196,25 @@ to_influx_lines(RawLines) ->
converter_influx_line(Line, AccIn) -> converter_influx_line(Line, AccIn) ->
case string:tokens(str(Line), " ") of case string:tokens(str(Line), " ") of
[MeasurementAndTags, Fields, Timestamp] -> [MeasurementAndTags, Fields, Timestamp] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), append_influx_item(MeasurementAndTags, Fields, Timestamp, AccIn);
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| AccIn
];
[MeasurementAndTags, Fields] -> [MeasurementAndTags, Fields] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), append_influx_item(MeasurementAndTags, Fields, undefined, AccIn);
%% TODO: fix here both here and influxdb driver.
%% Default value should evaluated by InfluxDB.
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => "${timestamp}"
}
| AccIn
];
_ -> _ ->
throw("Bad InfluxDB Line Protocol schema") throw("Bad InfluxDB Line Protocol schema")
end. end.
append_influx_item(MeasurementAndTags, Fields, Timestamp, Acc) ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| Acc
].
split_measurement_and_tags(Subject) -> split_measurement_and_tags(Subject) ->
case string:tokens(Subject, ",") of case string:tokens(Subject, ",") of
[] -> [] ->

View File

@ -525,7 +525,6 @@ t_start_ok(Config) ->
SentData = #{ SentData = #{
<<"clientid">> => ClientId, <<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME), <<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"payload">> => Payload <<"payload">> => Payload
}, },
?check_trace( ?check_trace(

View File

@ -1,7 +1,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.5"}}}, {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}},
{emqx, {path, "../../apps/emqx"}} {emqx, {path, "../../apps/emqx"}}
]}. ]}.

View File

@ -142,7 +142,11 @@ fields(common) ->
[ [
{server, server()}, {server, server()},
{precision, {precision,
mk(enum([ns, us, ms, s, m, h]), #{ %% The influxdb only supports these 4 precision:
%% See "https://github.com/influxdata/influxdb/blob/
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
%% more information.
mk(enum([ns, us, ms, s]), #{
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})} })}
]; ];
@ -210,9 +214,7 @@ start_client(InstId, Config) ->
do_start_client( do_start_client(
InstId, InstId,
ClientConfig, ClientConfig,
Config = #{ Config = #{write_syntax := Lines}
write_syntax := Lines
}
) -> ) ->
case influxdb:start_client(ClientConfig) of case influxdb:start_client(ClientConfig) of
{ok, Client} -> {ok, Client} ->
@ -220,7 +222,9 @@ do_start_client(
true -> true ->
State = #{ State = #{
client => Client, client => Client,
write_syntax => to_config(Lines) write_syntax => to_config(
Lines, proplists:get_value(precision, ClientConfig)
)
}, },
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting influxdb connector success", msg => "starting influxdb connector success",
@ -348,30 +352,33 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans
to_config(Lines) -> to_config(Lines, Precision) ->
to_config(Lines, []). to_config(Lines, [], Precision).
to_config([], Acc) -> to_config([], Acc, _Precision) ->
lists:reverse(Acc); lists:reverse(Acc);
to_config( to_config([Item0 | Rest], Acc, Precision) ->
[ Ts = maps:get(timestamp, Item0, undefined),
#{ Item = #{
measurement := Measurement, measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
timestamp := Timestamp, timestamp => preproc_tmpl_timestamp(Ts, Precision),
tags := Tags, tags => to_kv_config(maps:get(tags, Item0)),
fields := Fields fields => to_kv_config(maps:get(fields, Item0))
}
| Rest
],
Acc
) ->
Res = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
tags => to_kv_config(Tags),
fields => to_kv_config(Fields)
}, },
to_config(Rest, [Res | Acc]). to_config(Rest, [Item | Acc], Precision).
preproc_tmpl_timestamp(undefined, <<"ns">>) ->
erlang:system_time(nanosecond);
preproc_tmpl_timestamp(undefined, <<"us">>) ->
erlang:system_time(microsecond);
preproc_tmpl_timestamp(undefined, <<"ms">>) ->
erlang:system_time(millisecond);
preproc_tmpl_timestamp(undefined, <<"s">>) ->
erlang:system_time(second);
preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) ->
Ts;
preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) ->
emqx_plugin_libs_rule:preproc_tmpl(Ts).
to_kv_config(KVfields) -> to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
@ -414,7 +421,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
fields := [{binary(), binary()}], fields := [{binary(), binary()}],
measurement := binary(), measurement := binary(),
tags := [{binary(), binary()}], tags := [{binary(), binary()}],
timestamp := binary() timestamp := emqx_plugin_libs_rule:tmpl_token() | integer()
} }
]) -> {ok, [map()]} | {error, term()}. ]) -> {ok, [map()]} | {error, term()}.
data_to_points(Data, SyntaxLines) -> data_to_points(Data, SyntaxLines) ->
@ -430,46 +437,50 @@ lines_to_points(_, [], Points, ErrorPoints) ->
%% ignore trans succeeded points %% ignore trans succeeded points
{error, ErrorPoints} {error, ErrorPoints}
end; end;
lines_to_points( lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
Data, is_list(Ts)
[ ->
#{
measurement := Measurement,
timestamp := Timestamp,
tags := Tags,
fields := Fields
}
| Rest
],
ResultPointsAcc,
ErrorPointsAcc
) ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of
[TimestampInt] when is_integer(TimestampInt) -> [TsInt] when is_integer(TsInt) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), Item1 = Item#{timestamp => TsInt},
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
Point = #{ BadTs ->
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
timestamp => TimestampInt,
tags => EncodedTags,
fields => EncodedFields
},
case map_size(EncodedFields) =:= 0 of
true ->
%% influxdb client doesn't like empty field maps...
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, no_fields} | ErrorPointsAcc
]);
false ->
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
end;
BadTimestamp ->
lines_to_points(Data, Rest, ResultPointsAcc, [ lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc {error, {bad_timestamp, BadTs}} | ErrorPointsAcc
]) ])
end;
lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
is_integer(Ts)
->
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
case line_to_point(Data, Item) of
#{fields := Fields} when map_size(Fields) =:= 0 ->
%% influxdb client doesn't like empty field maps...
ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1);
Point ->
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
end. end.
line_to_point(
Data,
#{
measurement := Measurement,
tags := Tags,
fields := Fields
} = Item
) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
Item#{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
tags => EncodedTags,
fields => EncodedFields
}.
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},

View File

@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},