Merge pull request #10087 from SergeTupchiy/EMQX-8926_handle_empty_timestamp_influxdb_bridge

fix: use default template if timestamp is empty (undefined) in InfluxDB
This commit is contained in:
Zaiming (Stone) Shi 2023-03-08 13:53:33 +01:00 committed by GitHub
commit 11d5fac57f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 28 deletions

View File

@ -0,0 +1,2 @@
Use default template `${timestamp}` if the `timestamp` config is empty (undefined) when inserting data in InfluxDB.
Prior to this change, InfluxDB bridge inserted a wrong timestamp when template is not provided.

View File

@ -0,0 +1,2 @@
在 InfluxDB 中插入数据时,如果时间戳为空(未定义),则使用默认的占位符 `${timestamp}`
在此修复前如果时间戳字段没有设置InfluxDB 桥接使用了一个错误的时间戳。

View File

@ -527,7 +527,8 @@ t_start_ok(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
?check_trace(
begin
@ -685,7 +686,8 @@ t_const_timestamp(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
@ -740,7 +742,7 @@ t_boolean_variants(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?assertEqual(ok, send_message(Config, SentData)),
@ -805,7 +807,7 @@ t_bad_timestamp(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?check_trace(
@ -949,7 +951,7 @@ t_write_failure(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?check_trace(

View File

@ -35,11 +35,15 @@
desc/1
]).
-type ts_precision() :: ns | us | ms | s.
%% influxdb servers don't need parse
-define(INFLUXDB_HOST_OPTIONS, #{
default_port => ?INFLUXDB_DEFAULT_PORT
}).
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> async_if_possible.
@ -232,15 +236,14 @@ do_start_client(
ClientConfig,
Config = #{write_syntax := Lines}
) ->
Precision = maps:get(precision, Config, ms),
case influxdb:start_client(ClientConfig) of
{ok, Client} ->
case influxdb:is_alive(Client) of
true ->
State = #{
client => Client,
write_syntax => to_config(
Lines, proplists:get_value(precision, ClientConfig)
)
write_syntax => to_config(Lines, Precision)
},
?SLOG(info, #{
msg => "starting influxdb connector success",
@ -407,27 +410,36 @@ to_config(Lines, Precision) ->
to_config([], Acc, _Precision) ->
lists:reverse(Acc);
to_config([Item0 | Rest], Acc, Precision) ->
Ts = maps:get(timestamp, Item0, undefined),
Ts0 = maps:get(timestamp, Item0, undefined),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
timestamp => preproc_tmpl_timestamp(Ts, Precision),
timestamp => Ts,
precision => {FromPrecision, ToPrecision},
tags => to_kv_config(maps:get(tags, Item0)),
fields => to_kv_config(maps:get(fields, Item0))
},
to_config(Rest, [Item | Acc], Precision).
preproc_tmpl_timestamp(undefined, <<"ns">>) ->
erlang:system_time(nanosecond);
preproc_tmpl_timestamp(undefined, <<"us">>) ->
erlang:system_time(microsecond);
preproc_tmpl_timestamp(undefined, <<"ms">>) ->
erlang:system_time(millisecond);
preproc_tmpl_timestamp(undefined, <<"s">>) ->
erlang:system_time(second);
preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) ->
Ts;
preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) ->
emqx_plugin_libs_rule:preproc_tmpl(Ts).
%% pre-process the timestamp template
%% returns a tuple of three elements:
%% 1. The timestamp template itself.
%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
%% 3. The target timestamp precision (configured for the client).
preproc_tmpl_timestamp(undefined, Precision) ->
%% not configured, we default it to the message timestamp
preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
%% a const value is used which is very much unusual, but we have to add a special handling
{Ts, Precision, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
{emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
%% a placehold is in use. e.g. ${payload.my_timestamp}
%% we can only hope it the value will be of the same precision in the configs
{emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
@ -470,7 +482,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
fields := [{binary(), binary()}],
measurement := binary(),
tags := [{binary(), binary()}],
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer()
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(),
precision := {From :: ts_precision(), To :: ts_precision()}
}
]) -> {ok, [map()]} | {error, term()}.
data_to_points(Data, SyntaxLines) ->
@ -529,16 +542,27 @@ line_to_point(
#{
measurement := Measurement,
tags := Tags,
fields := Fields
fields := Fields,
timestamp := Ts,
precision := Precision
} = Item
) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
Item#{
maps:without([precision], Item#{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
tags => EncodedTags,
fields => EncodedFields
}.
fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision)
}).
maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
time_unit(s) -> second;
time_unit(ms) -> millisecond;
time_unit(us) -> microsecond;
time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},

View File

@ -227,5 +227,6 @@ test_query() ->
{send_message, #{
<<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>
<<"topic">> => <<"connector_test">>,
<<"timestamp">> => 1678220316257
}}.