diff --git a/changes/ee/fix-10087.en.md b/changes/ee/fix-10087.en.md new file mode 100644 index 000000000..fd6e10b7b --- /dev/null +++ b/changes/ee/fix-10087.en.md @@ -0,0 +1,2 @@ +Use default template `${timestamp}` if the `timestamp` config is empty (undefined) when inserting data in InfluxDB. +Prior to this change, InfluxDB bridge inserted a wrong timestamp when template is not provided. diff --git a/changes/ee/fix-10087.zh.md b/changes/ee/fix-10087.zh.md new file mode 100644 index 000000000..e08e61f37 --- /dev/null +++ b/changes/ee/fix-10087.zh.md @@ -0,0 +1,2 @@ +在 InfluxDB 中插入数据时,如果时间戳为空(未定义),则使用默认的占位符 `${timestamp}`。 +在此修复前,如果时间戳字段没有设置,InfluxDB 桥接使用了一个错误的时间戳。 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 6e5220ae0..8014dbdcc 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -527,7 +527,8 @@ t_start_ok(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?check_trace( begin @@ -685,7 +686,8 @@ t_const_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of @@ -740,7 +742,7 @@ t_boolean_variants(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?assertEqual(ok, send_message(Config, SentData)), @@ -805,7 +807,7 @@ t_bad_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( @@ -949,7 +951,7 @@ t_write_failure(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index a361e7035..7f5b56181 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -35,11 +35,15 @@ desc/1 ]). +-type ts_precision() :: ns | us | ms | s. + %% influxdb servers don't need parse -define(INFLUXDB_HOST_OPTIONS, #{ default_port => ?INFLUXDB_DEFAULT_PORT }). +-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -232,15 +236,14 @@ do_start_client( ClientConfig, Config = #{write_syntax := Lines} ) -> + Precision = maps:get(precision, Config, ms), case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client) of true -> State = #{ client => Client, - write_syntax => to_config( - Lines, proplists:get_value(precision, ClientConfig) - ) + write_syntax => to_config(Lines, Precision) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -407,27 +410,36 @@ to_config(Lines, Precision) -> to_config([], Acc, _Precision) -> lists:reverse(Acc); to_config([Item0 | Rest], Acc, Precision) -> - Ts = maps:get(timestamp, Item0, undefined), + Ts0 = maps:get(timestamp, Item0, undefined), + {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), Item = #{ measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), - timestamp => preproc_tmpl_timestamp(Ts, Precision), + timestamp => Ts, + precision => {FromPrecision, ToPrecision}, tags => to_kv_config(maps:get(tags, Item0)), fields => to_kv_config(maps:get(fields, Item0)) }, to_config(Rest, [Item | Acc], Precision). -preproc_tmpl_timestamp(undefined, <<"ns">>) -> - erlang:system_time(nanosecond); -preproc_tmpl_timestamp(undefined, <<"us">>) -> - erlang:system_time(microsecond); -preproc_tmpl_timestamp(undefined, <<"ms">>) -> - erlang:system_time(millisecond); -preproc_tmpl_timestamp(undefined, <<"s">>) -> - erlang:system_time(second); -preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) -> - Ts; -preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) -> - emqx_plugin_libs_rule:preproc_tmpl(Ts). +%% pre-process the timestamp template +%% returns a tuple of three elements: +%% 1. The timestamp template itself. +%% 2. The source timestamp precision (ms if the template ${timestamp} is used). +%% 3. The target timestamp precision (configured for the client). +preproc_tmpl_timestamp(undefined, Precision) -> + %% not configured, we default it to the message timestamp + preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision); +preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> + %% a const value is used which is very much unusual, but we have to add a special handling + {Ts, Precision, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> + preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); +preproc_tmpl_timestamp(<> = Ts, Precision) -> + {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> + %% a placehold is in use. e.g. ${payload.my_timestamp} + %% we can only hope it the value will be of the same precision in the configs + {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). @@ -470,7 +482,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := emqx_plugin_libs_rule:tmpl_token() | integer() + timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} } ]) -> {ok, [map()]} | {error, term()}. data_to_points(Data, SyntaxLines) -> @@ -529,16 +542,27 @@ line_to_point( #{ measurement := Measurement, tags := Tags, - fields := Fields + fields := Fields, + timestamp := Ts, + precision := Precision } = Item ) -> {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), - Item#{ + maps:without([precision], Item#{ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), tags => EncodedTags, - fields => EncodedFields - }. + fields => EncodedFields, + timestamp => maybe_convert_time_unit(Ts, Precision) + }). + +maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) -> + erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)). + +time_unit(s) -> second; +time_unit(ms) -> millisecond; +time_unit(us) -> microsecond; +time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl index f5e43c0bb..72fc11a67 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl @@ -227,5 +227,6 @@ test_query() -> {send_message, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, - <<"topic">> => <<"connector_test">> + <<"topic">> => <<"connector_test">>, + <<"timestamp">> => 1678220316257 }}.