chore: by CR comments

This commit is contained in:
Dennis Zhuang 2023-07-21 14:59:24 +08:00 committed by firest
parent c9550cc2e5
commit b34374c26f
2 changed files with 15 additions and 7 deletions

View File

@ -1,2 +1,2 @@
erlang 25.3.2.3 erlang 25.3.2-1
elixir 1.14.5-otp-25 elixir 1.14.5-otp-25

View File

@ -32,7 +32,9 @@
]). ]).
%% only for test %% only for test
-ifdef(TEST).
-export([is_unrecoverable_error/1]). -export([is_unrecoverable_error/1]).
-endif.
-type ts_precision() :: ns | us | ms | s. -type ts_precision() :: ns | us | ms | s.
@ -186,8 +188,8 @@ start_client(InstId, Config) ->
msg => "start greptimedb connector error", msg => "start greptimedb connector error",
connector => InstId, connector => InstId,
error => E, error => E,
reason => R, reason => emqx_utils:redact(R),
stack => S stack => emqx_utils:redact(S)
}), }),
{error, R} {error, R}
end. end.
@ -342,7 +344,7 @@ to_config(Lines, Precision) ->
to_config([], Acc, _Precision) -> to_config([], Acc, _Precision) ->
lists:reverse(Acc); lists:reverse(Acc);
to_config([Item0 | Rest], Acc, Precision) -> to_config([Item0 | Rest], Acc, Precision) ->
Ts0 = maps:get(timestamp, Item0, undefined), Ts0 = maps:get(timestamp, Item0, ?DEFAULT_TIMESTAMP_TMPL),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{ Item = #{
measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)), measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
@ -374,7 +376,11 @@ preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
{emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}. {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) -> to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). lists:foldl(
fun({K, V}, Acc) -> to_maps_config(K, V, Acc) end,
#{},
KVfields
).
to_maps_config(K, V, Res) -> to_maps_config(K, V, Res) ->
NK = emqx_placeholder:preproc_tmpl(bin(K)), NK = emqx_placeholder:preproc_tmpl(bin(K)),
@ -391,6 +397,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
{[Points | ListOfPoints], ErrAccIn}; {[Points | ListOfPoints], ErrAccIn};
{error, ErrorPoints} -> {error, ErrorPoints} ->
log_error_points(InstId, ErrorPoints), log_error_points(InstId, ErrorPoints),
{ListOfPoints, ErrAccIn + 1};
_ ->
{ListOfPoints, ErrAccIn + 1} {ListOfPoints, ErrAccIn + 1}
end end
end, end,
@ -522,8 +530,6 @@ value_type([UInt, <<"u">>]) when
is_integer(UInt) is_integer(UInt)
-> ->
greptimedb_values:uint64_value(UInt); greptimedb_values:uint64_value(UInt);
value_type([Float]) when is_float(Float) ->
Float;
value_type([<<"t">>]) -> value_type([<<"t">>]) ->
greptimedb_values:boolean_value(true); greptimedb_values:boolean_value(true);
value_type([<<"T">>]) -> value_type([<<"T">>]) ->
@ -544,6 +550,8 @@ value_type([<<"FALSE">>]) ->
greptimedb_values:boolean_value(false); greptimedb_values:boolean_value(false);
value_type([<<"False">>]) -> value_type([<<"False">>]) ->
greptimedb_values:boolean_value(false); greptimedb_values:boolean_value(false);
value_type([Float]) when is_float(Float) ->
Float;
value_type(Val) -> value_type(Val) ->
#{values => #{string_values => Val}, datatype => 'STRING'}. #{values => #{string_values => Val}, datatype => 'STRING'}.