From 1711823487859d1cf572a1e42febb14efb57f8b9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 12 Jan 2023 14:37:07 +0800 Subject: [PATCH] refactor: remove default value of timestamp field of influxdb --- .../src/emqx_plugin_libs.app.src | 2 +- .../src/emqx_plugin_libs_rule.erl | 2 + .../src/emqx_ee_bridge_influxdb.erl | 36 ++--- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 1 - lib-ee/emqx_ee_connector/rebar.config | 2 +- .../src/emqx_ee_connector_influxdb.erl | 135 ++++++++++-------- mix.exs | 2 +- 7 files changed, 92 insertions(+), 88 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index bcdcfe420..3120b8503 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugin_libs, [ {description, "EMQX Plugin utility libs"}, - {vsn, "4.3.4"}, + {vsn, "4.3.5"}, {modules, []}, {applications, [kernel, stdlib]}, {env, []} diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 57bdd16e5..969374309 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -63,6 +63,8 @@ can_topic_match_oneof/2 ]). +-export_type([tmpl_token/0]). + -compile({no_auto_import, [float/1]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 62c8b6ab7..b42c27832 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -196,33 +196,25 @@ to_influx_lines(RawLines) -> converter_influx_line(Line, AccIn) -> case string:tokens(str(Line), " ") of [MeasurementAndTags, Fields, Timestamp] -> - {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), - [ - #{ - measurement => Measurement, - tags => kv_pairs(Tags), - fields => kv_pairs(string:tokens(Fields, ",")), - timestamp => Timestamp - } - | AccIn - ]; + append_influx_item(MeasurementAndTags, Fields, Timestamp, AccIn); [MeasurementAndTags, Fields] -> - {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), - %% TODO: fix here both here and influxdb driver. - %% Default value should evaluated by InfluxDB. - [ - #{ - measurement => Measurement, - tags => kv_pairs(Tags), - fields => kv_pairs(string:tokens(Fields, ",")), - timestamp => "${timestamp}" - } - | AccIn - ]; + append_influx_item(MeasurementAndTags, Fields, undefined, AccIn); _ -> throw("Bad InfluxDB Line Protocol schema") end. +append_influx_item(MeasurementAndTags, Fields, Timestamp, Acc) -> + {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), + [ + #{ + measurement => Measurement, + tags => kv_pairs(Tags), + fields => kv_pairs(string:tokens(Fields, ",")), + timestamp => Timestamp + } + | Acc + ]. + split_measurement_and_tags(Subject) -> case string:tokens(Subject, ",") of [] -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 6331611d0..bb87a9f37 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -525,7 +525,6 @@ t_start_ok(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), <<"payload">> => Payload }, ?check_trace( diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index ab4c88396..262641d44 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,7 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.5"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 7974bf028..db99c4475 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -142,7 +142,11 @@ fields(common) -> [ {server, server()}, {precision, - mk(enum([ns, us, ms, s, m, h]), #{ + %% The influxdb only supports these 4 precision: + %% See "https://github.com/influxdata/influxdb/blob/ + %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for + %% more information. + mk(enum([ns, us, ms, s]), #{ required => false, default => ms, desc => ?DESC("precision") })} ]; @@ -210,9 +214,7 @@ start_client(InstId, Config) -> do_start_client( InstId, ClientConfig, - Config = #{ - write_syntax := Lines - } + Config = #{write_syntax := Lines} ) -> case influxdb:start_client(ClientConfig) of {ok, Client} -> @@ -220,7 +222,9 @@ do_start_client( true -> State = #{ client => Client, - write_syntax => to_config(Lines) + write_syntax => to_config( + Lines, proplists:get_value(precision, ClientConfig) + ) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -348,30 +352,33 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans -to_config(Lines) -> - to_config(Lines, []). +to_config(Lines, Precision) -> + to_config(Lines, [], Precision). -to_config([], Acc) -> +to_config([], Acc, _Precision) -> lists:reverse(Acc); -to_config( - [ - #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields - } - | Rest - ], - Acc -) -> - Res = #{ - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), - tags => to_kv_config(Tags), - fields => to_kv_config(Fields) +to_config([Item0 | Rest], Acc, Precision) -> + Ts = maps:get(timestamp, Item0, undefined), + Item = #{ + measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), + timestamp => preproc_tmpl_timestamp(Ts, Precision), + tags => to_kv_config(maps:get(tags, Item0)), + fields => to_kv_config(maps:get(fields, Item0)) }, - to_config(Rest, [Res | Acc]). + to_config(Rest, [Item | Acc], Precision). + +preproc_tmpl_timestamp(undefined, <<"ns">>) -> + erlang:system_time(nanosecond); +preproc_tmpl_timestamp(undefined, <<"us">>) -> + erlang:system_time(microsecond); +preproc_tmpl_timestamp(undefined, <<"ms">>) -> + erlang:system_time(millisecond); +preproc_tmpl_timestamp(undefined, <<"s">>) -> + erlang:system_time(second); +preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) -> + Ts; +preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) -> + emqx_plugin_libs_rule:preproc_tmpl(Ts). to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). @@ -414,7 +421,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := binary() + timestamp := emqx_plugin_libs_rule:tmpl_token() | integer() } ]) -> {ok, [map()]} | {error, term()}. data_to_points(Data, SyntaxLines) -> @@ -430,46 +437,50 @@ lines_to_points(_, [], Points, ErrorPoints) -> %% ignore trans succeeded points {error, ErrorPoints} end; -lines_to_points( - Data, - [ - #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields - } - | Rest - ], - ResultPointsAcc, - ErrorPointsAcc -) -> +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_list(Ts) +-> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of - [TimestampInt] when is_integer(TimestampInt) -> - {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), - {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), - Point = #{ - measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), - timestamp => TimestampInt, - tags => EncodedTags, - fields => EncodedFields - }, - case map_size(EncodedFields) =:= 0 of - true -> - %% influxdb client doesn't like empty field maps... - lines_to_points(Data, Rest, ResultPointsAcc, [ - {error, no_fields} | ErrorPointsAcc - ]); - false -> - lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) - end; - BadTimestamp -> + case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of + [TsInt] when is_integer(TsInt) -> + Item1 = Item#{timestamp => TsInt}, + continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); + BadTs -> lines_to_points(Data, Rest, ResultPointsAcc, [ - {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc + {error, {bad_timestamp, BadTs}} | ErrorPointsAcc ]) + end; +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_integer(Ts) +-> + continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). + +continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> + case line_to_point(Data, Item) of + #{fields := Fields} when map_size(Fields) =:= 0 -> + %% influxdb client doesn't like empty field maps... + ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc], + lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1); + Point -> + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) end. +line_to_point( + Data, + #{ + measurement := Measurement, + tags := Tags, + fields := Fields + } = Item +) -> + {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + Item#{ + measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), + tags => EncodedTags, + fields => EncodedFields + }. + maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, diff --git a/mix.exs b/mix.exs index 832f6e7ae..27d9b43ce 100644 --- a/mix.exs +++ b/mix.exs @@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},