fix: batch write

This commit is contained in:
Dennis Zhuang 2023-07-06 12:12:08 +08:00 committed by firest
parent 6d9944a8e8
commit 91ebd90442
4 changed files with 1202 additions and 53 deletions

View File

@ -3,11 +3,10 @@
]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.1"}}}
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.1"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.

View File

@ -1,3 +1,298 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb).
-export([]).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-type write_syntax() :: list().
-reflect_type([write_syntax/0]).
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
-export([to_influx_lines/1]).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"greptimedb_grpc_v1">> => #{
summary => <<"Greptimedb HTTP API V2 Bridge">>,
value => values("greptimedb_grpc_v1", Method)
}
}
].
values(Protocol, get) ->
values(Protocol, post);
values("greptimedb_grpc_v1", post) ->
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
TypeOpts = #{
bucket => <<"example_bucket">>,
org => <<"examlpe_org">>,
token => <<"example_token">>,
server => <<"127.0.0.1:4000">>
},
values(common, "greptimedb_grpc_v1", SupportUint, TypeOpts);
values(Protocol, put) ->
values(Protocol, post).
values(common, Protocol, SupportUint, TypeOpts) ->
CommonConfigs = #{
type => list_to_atom(Protocol),
name => <<"demo">>,
enable => true,
local_topic => <<"local/topic/#">>,
write_syntax =>
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
"bool=${payload.bool}">>,
precision => ms,
resource_opts => #{
batch_size => 100,
batch_time => <<"20ms">>
},
server => <<"127.0.0.1:4000">>,
ssl => #{enable => false}
},
maps:merge(TypeOpts, CommonConfigs).
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_greptimedb".
roots() -> [].
fields("post_grpc_v1") ->
method_fields(post, greptimedb_grpc_v1);
fields("put_grpc_v1") ->
method_fields(put, greptimedb_grpc_v1);
fields("get_grpc_v1") ->
method_fields(get, greptimedb_grpc_v1);
fields(Type) when
Type == greptimedb_grpc_v1
->
greptimedb_bridge_common_fields() ++
connector_fields(Type).
method_fields(post, ConnectorType) ->
greptimedb_bridge_common_fields() ++
connector_fields(ConnectorType) ++
type_name_fields(ConnectorType);
method_fields(get, ConnectorType) ->
greptimedb_bridge_common_fields() ++
connector_fields(ConnectorType) ++
type_name_fields(ConnectorType) ++
emqx_bridge_schema:status_fields();
method_fields(put, ConnectorType) ->
greptimedb_bridge_common_fields() ++
connector_fields(ConnectorType).
greptimedb_bridge_common_fields() ->
emqx_bridge_schema:common_bridge_fields() ++
[
{write_syntax, fun write_syntax/1}
] ++
emqx_resource_schema:fields("resource_opts").
connector_fields(Type) ->
emqx_bridge_greptimedb_connector:fields(Type).
type_name_fields(Type) ->
[
{type, mk(Type, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Greptimedb using `", string:to_upper(Method), "` method."];
desc(greptimedb_grpc_v1) ->
?DESC(emqx_bridge_greptimedb_connector, "greptimedb_grpc_v1");
desc(_) ->
undefined.
write_syntax(type) ->
?MODULE:write_syntax();
write_syntax(required) ->
true;
write_syntax(validator) ->
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
write_syntax(converter) ->
fun to_influx_lines/1;
write_syntax(desc) ->
?DESC("write_syntax");
write_syntax(format) ->
<<"sql">>;
write_syntax(_) ->
undefined.
to_influx_lines(RawLines) ->
try
influx_lines(str(RawLines), [])
catch
_:Reason:Stacktrace ->
Msg = lists:flatten(
io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines])
),
?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
throw(Msg)
end.
-define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
-define(FIELD_VAL_ESC_CHARS, [$", $\\]).
% Common separator for both tags and fields
-define(SEP, $\s).
-define(MEASUREMENT_TAG_SEP, $,).
-define(KEY_SEP, $=).
-define(VAL_SEP, $,).
-define(NON_EMPTY, [_ | _]).
influx_lines([] = _RawLines, Acc) ->
?NON_EMPTY = lists:reverse(Acc);
influx_lines(RawLines, Acc) ->
{Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc),
influx_lines(RawLines1, Acc1).
influx_line([], Acc) ->
{Acc, []};
influx_line(Line, Acc) ->
{?NON_EMPTY = Measurement, Line1} = measurement(Line),
{Tags, Line2} = tags(Line1),
{?NON_EMPTY = Fields, Line3} = influx_fields(Line2),
{Timestamp, Line4} = timestamp(Line3),
{
[
#{
measurement => Measurement,
tags => Tags,
fields => Fields,
timestamp => Timestamp
}
| Acc
],
Line4
}.
measurement(Line) ->
unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).
tags([?MEASUREMENT_TAG_SEP | Line]) ->
tags1(Line, []);
tags(Line) ->
{[], Line}.
%% Empty line is invalid as fields are required after tags,
%% need to break recursion here and fail later on parsing fields
tags1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
%% Matching non empty Acc treats lines like "m, field=field_val" invalid
tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
{lists:reverse(Acc), Line};
tags1(Line, Acc) ->
{Tag, Line1} = tag(Line),
tags1(Line1, [Tag | Acc]).
tag(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{?NON_EMPTY = Val, Line2} = tag_val(Line1),
{{Key, Val}, Line2}.
tag_val(Line) ->
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
{Val, strip_l(Line1, ?VAL_SEP)}.
influx_fields([?SEP | Line]) ->
fields1(string:trim(Line, leading, "\s"), []).
%% Timestamp is optional, so fields may be at the very end of the line
fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
{lists:reverse(Acc), Line};
fields1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
fields1(Line, Acc) ->
{Field, Line1} = field(Line),
fields1(Line1, [Field | Acc]).
field(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{Val, Line2} = field_val(Line1),
{{Key, Val}, Line2}.
field_val([$" | Line]) ->
{Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
%% Quoted val can be empty
{Val, strip_l(Line1, ?VAL_SEP)};
field_val(Line) ->
%% Unquoted value should not be un-escaped according to Greptimedb protocol,
%% as it can only hold float, integer, uinteger or boolean value.
%% However, as templates are possible, un-escaping is applied here,
%% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
{?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.
timestamp([?SEP | Line]) ->
Line1 = string:trim(Line, leading, "\s"),
%% Similarly to unquoted field value, un-escape a timestamp to validate and handle
%% potentially escaped characters in a template
{T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
{timestamp1(T), Line2};
timestamp(Line) ->
{undefined, Line}.
timestamp1(?NON_EMPTY = Ts) -> Ts;
timestamp1(_Ts) -> undefined.
%% Common for both tag and field keys
key(Line) ->
{Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
{Key, strip_l(Line1, ?KEY_SEP)}.
%% Only strip a character between pairs, don't strip it(and let it fail)
%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
[Ch1 | Str];
strip_l(Str, _Ch) ->
Str.
unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
ShouldEscapeBackslash = lists:member($\\, EscapeChars),
Acc1 =
case lists:member(Char, EscapeChars) of
true -> [Char | Acc];
false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
end,
unescape(EscapeChars, SepChars, T, Acc1);
unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
IsEscapeChar = lists:member(Char, EscapeChars),
case lists:member(Char, SepChars) of
true -> {lists:reverse(Acc), L};
false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
end;
unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
{lists:reverse(Acc), L}.
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.

View File

@ -1,84 +1,179 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb_connector).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% `emqx_resource' API
-import(hoconsc, [mk/2, enum/1, ref/2]).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_get_status/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4
on_get_status/2
]).
-define(GREPTIMEDB_DEFAULT_PORT, 4001).
-export([
roots/0,
namespace/0,
fields/1,
desc/1
]).
%% only for test
-export([is_unrecoverable_error/1]).
-type ts_precision() :: ns | us | ms | s.
%% Allocatable resources
-define(greptime_client, greptime_client).
-define(GREPTIMEDB_DEFAULT_PORT, 4000).
-define(DEFAULT_DB, <<"public">>).
-define(GREPTIMEDB_HOST_OPTIONS, #{
default_port => ?GREPTIMEDB_DEFAULT_PORT
}).
%%-------------------------------------------------------------------------------------
%% `emqx_resource' API
%%-------------------------------------------------------------------------------------
callback_mode() -> async_if_possible.
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
on_start(InstId, Config) ->
%% InstID as pool would be handled by greptimedb client
%% so there is no need to allocate pool_name here
%% See: greptimedb:start_client/1
start_client(InstId, Config).
on_stop(_InstId, #{client := Client}) ->
greptimedb:stop_client(Client).
on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of
#{?greptime_client := Client} ->
greptimedb:stop_client(Client);
_ ->
ok
end.
on_get_status(_InstId, _State) ->
%% FIXME
connected.
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => false, mode => sync}
),
do_query(InstId, Client, Points);
{error, ErrorPoints} ->
?tp(
greptimedb_connector_send_query_error,
#{batch => false, mode => sync, error => ErrorPoints}
),
log_error_points(InstId, ErrorPoints),
{error, {unrecoverable_error, ErrorPoints}}
end.
on_query(_InstanceId, {send_message, _Message}, _State) ->
todo.
%% Once a Batched Data trans to points failed.
%% This batch query failed
on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => true, mode => sync}
),
do_query(InstId, Client, Points);
{error, Reason} ->
?tp(
greptimedb_connector_send_query_error,
#{batch => true, mode => sync, error => Reason}
),
{error, {unrecoverable_error, Reason}}
end.
on_query_async(_InstanceId, {send_message, _Message}, _ReplyFunAndArgs0, _State) ->
todo.
on_get_status(_InstId, #{client := Client}) ->
case greptimedb:is_alive(Client) of
true ->
connected;
false ->
disconnected
end.
on_batch_query(
_ResourceID,
_BatchReq,
_State
) ->
todo.
%% -------------------------------------------------------------------------------------------------
%% schema
namespace() -> connector_greptimedb.
on_batch_query_async(
_InstId,
_BatchData,
{_ReplyFun, _Args},
_State
) ->
todo.
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, greptimedb_grpc_v1)
]
)
}}
].
fields(common) ->
[
{server, server()},
{precision,
%% The greptimedb only supports these 4 precision:
%% See "https://github.com/influxdata/greptimedb/blob/
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
%% more information.
mk(enum([ns, us, ms, s]), #{
required => false, default => ms, desc => ?DESC("precision")
})}
];
fields(greptimedb_grpc_v1) ->
fields(common) ++
[
{dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}
] ++ emqx_connector_schema_lib:ssl_fields().
server() ->
Meta = #{
required => false,
default => <<"127.0.0.1:4000">>,
desc => ?DESC("server"),
converter => fun convert_server/2
},
emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS).
desc(common) ->
?DESC("common");
desc(greptimedb_grpc_v1) ->
?DESC("greptimedb_grpc_v1").
%% -------------------------------------------------------------------------------------------------
%% internal functions
start_client(InstId, Config) ->
ClientConfig = client_config(InstId, Config),
?SLOG(info, #{
msg => "starting GreptimeDB connector",
msg => "starting greptimedb connector",
connector => InstId,
config => emqx_utils:redact(Config),
client_config => emqx_utils:redact(ClientConfig)
}),
try
case greptimedb:start_client(ClientConfig) of
{ok, Client} ->
{ok, #{client => Client}};
{error, Reason} ->
?tp(greptimedb_connector_start_failed, #{error => Reason}),
?SLOG(warning, #{
msg => "failed_to_start_greptimedb_connector",
connector => InstId,
reason => Reason
}),
{error, Reason}
end
try do_start_client(InstId, ClientConfig, Config) of
Res = {ok, #{client := Client}} ->
ok = emqx_resource:allocate_resource(InstId, ?greptime_client, Client),
Res;
{error, Reason} ->
{error, Reason}
catch
E:R:S ->
?tp(greptimedb_connector_start_exception, #{error => {E, R}}),
@ -92,9 +187,64 @@ start_client(InstId, Config) ->
{error, R}
end.
do_start_client(
InstId,
ClientConfig,
Config = #{write_syntax := Lines}
) ->
Precision = maps:get(precision, Config, ms),
case greptimedb:start_client(ClientConfig) of
{ok, Client} ->
case greptimedb:is_alive(Client, true) of
true ->
State = #{
client => Client,
dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB),
write_syntax => to_config(Lines, Precision)
},
?SLOG(info, #{
msg => "starting greptimedb connector success",
connector => InstId,
client => redact_auth(Client),
state => redact_auth(State)
}),
{ok, State};
{false, Reason} ->
?tp(greptimedb_connector_start_failed, #{
error => greptimedb_client_not_alive, reason => Reason
}),
?SLOG(warning, #{
msg => "failed_to_start_greptimedb_connector",
connector => InstId,
client => redact_auth(Client),
reason => Reason
}),
%% no leak
_ = greptimedb:stop_client(Client),
{error, greptimedb_client_not_alive}
end;
{error, {already_started, Client0}} ->
?tp(greptimedb_connector_start_already_started, #{}),
?SLOG(info, #{
msg => "restarting greptimedb connector, found already started client",
connector => InstId,
old_client => redact_auth(Client0)
}),
_ = greptimedb:stop_client(Client0),
do_start_client(InstId, ClientConfig, Config);
{error, Reason} ->
?tp(greptimedb_connector_start_failed, #{error => Reason}),
?SLOG(warning, #{
msg => "failed_to_start_greptimedb_connector",
connector => InstId,
reason => Reason
}),
{error, Reason}
end.
client_config(
InstId,
_Config = #{
Config = #{
server := Server
}
) ->
@ -103,12 +253,369 @@ client_config(
{endpoints, [{http, str(Host), Port}]},
{pool_size, erlang:system_info(schedulers)},
{pool, InstId},
{pool_type, random}
{pool_type, random},
{timeunit, maps:get(precision, Config, ms)}
] ++ protocol_config(Config).
protocol_config(
#{
dbname := DbName,
ssl := SSL
} = Config
) ->
[
{dbname, DbName}
] ++ auth(Config) ++
ssl_config(SSL).
ssl_config(#{enable := false}) ->
[
{https_enabled, false}
];
ssl_config(SSL = #{enable := true}) ->
[
{https_enabled, true},
{transport, ssl},
{transport_opts, emqx_tls_lib:to_client_opts(SSL)}
].
auth(#{username := Username, password := Password}) ->
[
{auth, {basic, #{username => Username, password => Password}}}
];
auth(_) ->
[].
redact_auth(Term) ->
emqx_utils:redact(Term, fun is_auth_key/1).
is_auth_key(Key) when is_binary(Key) ->
string:equal("authorization", Key, true);
is_auth_key(_) ->
false.
%% -------------------------------------------------------------------------------------------------
%% Query
do_query(InstId, Client, Points) ->
case greptimedb:write_batch(Client, Points) of
{ok, _} ->
?SLOG(debug, #{
msg => "greptimedb write point success",
connector => InstId,
points => Points
});
{error, {401, _, _}} ->
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
?SLOG(error, #{
msg => "greptimedb_authorization_failed",
client => redact_auth(Client),
connector => InstId
}),
{error, {unrecoverable_error, <<"authorization failure">>}};
{error, Reason} = Err ->
?tp(greptimedb_connector_do_query_failure, #{error => Reason}),
?SLOG(error, #{
msg => "greptimedb write point failed",
connector => InstId,
reason => Reason
}),
case is_unrecoverable_error(Err) of
true ->
{error, {unrecoverable_error, Reason}};
false ->
{error, {recoverable_error, Reason}}
end
end.
%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans
to_config(Lines, Precision) ->
to_config(Lines, [], Precision).
to_config([], Acc, _Precision) ->
lists:reverse(Acc);
to_config([Item0 | Rest], Acc, Precision) ->
Ts0 = maps:get(timestamp, Item0, undefined),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{
measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
timestamp => Ts,
precision => {FromPrecision, ToPrecision},
tags => to_kv_config(maps:get(tags, Item0)),
fields => to_kv_config(maps:get(fields, Item0))
},
to_config(Rest, [Item | Acc], Precision).
%% pre-process the timestamp template
%% returns a tuple of three elements:
%% 1. The timestamp template itself.
%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
%% 3. The target timestamp precision (configured for the client).
preproc_tmpl_timestamp(undefined, Precision) ->
%% not configured, we default it to the message timestamp
preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
%% a const value is used which is very much unusual, but we have to add a special handling
{Ts, Precision, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
{emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
%% a placehold is in use. e.g. ${payload.my_timestamp}
%% we can only hope it the value will be of the same precision in the configs
{emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
to_maps_config(K, V, Res) ->
NK = emqx_placeholder:preproc_tmpl(bin(K)),
NV = emqx_placeholder:preproc_tmpl(bin(V)),
Res#{NK => NV}.
%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Data Trans
parse_batch_data(InstId, BatchData, SyntaxLines) ->
{Points, Errors} = lists:foldl(
fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
{[Points | ListOfPoints], ErrAccIn};
{error, ErrorPoints} ->
log_error_points(InstId, ErrorPoints),
{ListOfPoints, ErrAccIn + 1}
end
end,
{[], 0},
BatchData
),
case Errors of
0 ->
{ok, lists:flatten(Points)};
_ ->
?SLOG(error, #{
msg => io_lib:format("Greptimedb trans point failed, count: ~p", [Errors]),
connector => InstId,
reason => points_trans_failed
}),
{error, points_trans_failed}
end.
-spec data_to_points(map(), [
#{
fields := [{binary(), binary()}],
measurement := binary(),
tags := [{binary(), binary()}],
timestamp := emqx_placeholder:tmpl_token() | integer(),
precision := {From :: ts_precision(), To :: ts_precision()}
}
]) -> {ok, [map()]} | {error, term()}.
data_to_points(Data, SyntaxLines) ->
lines_to_points(Data, SyntaxLines, [], []).
%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated.
%% And once a row fails to convert, all of them are considered to have failed.
lines_to_points(_, [], Points, ErrorPoints) ->
case ErrorPoints of
[] ->
{ok, Points};
_ ->
%% ignore trans succeeded points
{error, ErrorPoints}
end;
lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
is_list(Ts)
->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
{ok, TsInt} ->
Item1 = Item#{timestamp => TsInt},
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
{error, BadTs} ->
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTs}} | ErrorPointsAcc
])
end;
lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
is_integer(Ts)
->
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
parse_timestamp([TsInt]) when is_integer(TsInt) ->
{ok, TsInt};
parse_timestamp([TsBin]) ->
try
{ok, binary_to_integer(TsBin)}
catch
_:_ ->
{error, TsBin}
end.
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
case line_to_point(Data, Item) of
#{fields := Fields} when map_size(Fields) =:= 0 ->
%% greptimedb client doesn't like empty field maps...
ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1);
Point ->
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
end.
line_to_point(
Data,
#{
measurement := Measurement,
tags := Tags,
fields := Fields,
timestamp := Ts,
precision := Precision
} = Item
) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
TableName = emqx_placeholder:proc_tmpl(Measurement, Data),
{TableName, [
maps:without([precision], Item#{
tags => EncodedTags,
fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision)
})
]}.
maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
time_unit(s) -> second;
time_unit(ms) -> millisecond;
time_unit(us) -> microsecond;
time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
case {NK0, NV} of
{[undefined], _} ->
{Data, Res};
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
{_, [undefined | _]} ->
{Data, Res};
_ ->
NK = list_to_binary(NK0),
{Data, Res#{NK => value_type(NV)}}
end.
value_type([Int, <<"i">>]) when
is_integer(Int)
->
greptimedb_values:int64_value(Int);
value_type([UInt, <<"u">>]) when
is_integer(UInt)
->
greptimedb_values:uint64_value(UInt);
value_type([Float]) when is_float(Float) ->
Float;
value_type([<<"t">>]) ->
greptimedb_values:boolean_value(true);
value_type([<<"T">>]) ->
greptimedb_values:boolean_value(true);
value_type([true]) ->
greptimedb_values:boolean_value(true);
value_type([<<"TRUE">>]) ->
greptimedb_values:boolean_value(true);
value_type([<<"True">>]) ->
greptimedb_values:boolean_value(true);
value_type([<<"f">>]) ->
greptimedb_values:boolean_value(false);
value_type([<<"F">>]) ->
greptimedb_values:boolean_value(false);
value_type([false]) ->
greptimedb_values:boolean_value(false);
value_type([<<"FALSE">>]) ->
greptimedb_values:boolean_value(false);
value_type([<<"False">>]) ->
greptimedb_values:boolean_value(false);
value_type(Val) ->
#{values => #{string_values => Val, datatype => 'STRING'}}.
key_filter(undefined) -> undefined;
key_filter(Value) -> emqx_utils_conv:bin(Value).
data_filter(undefined) -> undefined;
data_filter(Int) when is_integer(Int) -> Int;
data_filter(Number) when is_number(Number) -> Number;
data_filter(Bool) when is_boolean(Bool) -> Bool;
data_filter(Data) -> bin(Data).
bin(Data) -> emqx_utils_conv:bin(Data).
%% helper funcs
log_error_points(InstId, Errs) ->
lists:foreach(
fun({error, Reason}) ->
?SLOG(error, #{
msg => "greptimedb trans point failed",
connector => InstId,
reason => Reason
})
end,
Errs
).
convert_server(<<"http://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts);
convert_server(<<"https://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts);
convert_server(Server, HoconOpts) ->
emqx_schema:convert_servers(Server, HoconOpts).
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true;
is_unrecoverable_error(_) ->
false.
%%===================================================================
%% eunit tests
%%===================================================================
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
is_auth_key_test_() ->
[
?_assert(is_auth_key(<<"Authorization">>)),
?_assertNot(is_auth_key(<<"Something">>)),
?_assertNot(is_auth_key(89))
].
%% for coverage
desc_test_() ->
[
?_assertMatch(
{desc, _, _},
desc(common)
),
?_assertMatch(
{desc, _, _},
desc(greptimedb_grpc_v1)
),
?_assertMatch(
{desc, _, _},
hocon_schema:field_schema(server(), desc)
),
?_assertMatch(
connector_greptimedb,
namespace()
)
].
-endif.

View File

@ -0,0 +1,348 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb_tests).
-include_lib("eunit/include/eunit.hrl").
-define(INVALID_LINES, [
" ",
" \n",
" \n\n\n ",
"\n",
" \n\n \n \n",
"measurement",
"measurement ",
"measurement,tag",
"measurement field",
"measurement,tag field",
"measurement,tag field ${timestamp}",
"measurement,tag=",
"measurement,tag=tag1",
"measurement,tag =",
"measurement field=",
"measurement field= ",
"measurement field = ",
"measurement, tag = field = ",
"measurement, tag = field = ",
"measurement, tag = tag_val field = field_val",
"measurement, tag = tag_val field = field_val ${timestamp}",
"measurement,= = ${timestamp}",
"measurement,t=a, f=a, ${timestamp}",
"measurement,t=a,t1=b, f=a,f1=b, ${timestamp}",
"measurement,t=a,t1=b, f=a,f1=b,",
"measurement,t=a, t1=b, f=a,f1=b,",
"measurement,t=a,,t1=b, f=a,f1=b,",
"measurement,t=a,,t1=b f=a,,f1=b",
"measurement,t=a,,t1=b f=a,f1=b ${timestamp}",
"measurement, f=a,f1=b",
"measurement, f=a,f1=b ${timestamp}",
"measurement,, f=a,f1=b ${timestamp}",
"measurement,, f=a,f1=b",
"measurement,, f=a,f1=b,, ${timestamp}",
"measurement f=a,f1=b,, ${timestamp}",
"measurement,t=a f=a,f1=b,, ${timestamp}",
"measurement,t=a f=a,f1=b,, ",
"measurement,t=a f=a,f1=b,,",
"measurement, t=a f=a,f1=b",
"measurement,t=a f=a, f1=b",
"measurement,t=a f=a, f1=b ${timestamp}",
"measurement, t=a f=a, f1=b ${timestamp}",
"measurement,t= a f=a,f1=b ${timestamp}",
"measurement,t= a f=a,f1 =b ${timestamp}",
"measurement, t = a f = a,f1 = b ${timestamp}",
"measurement,t=a f=a,f1=b \n ${timestamp}",
"measurement,t=a \n f=a,f1=b \n ${timestamp}",
"measurement,t=a \n f=a,f1=b \n ",
"\n measurement,t=a \n f=a,f1=b \n ${timestamp}",
"\n measurement,t=a \n f=a,f1=b \n",
%% not escaped backslash in a quoted field value is invalid
"measurement,tag=1 field=\"val\\1\""
]).
-define(VALID_LINE_PARSED_PAIRS, [
{"m1,tag=tag1 field=field1 ${timestamp1}", #{
measurement => "m1",
tags => [{"tag", "tag1"}],
fields => [{"field", "field1"}],
timestamp => "${timestamp1}"
}},
{"m2,tag=tag2 field=field2", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field2"}],
timestamp => undefined
}},
{"m3 field=field3 ${timestamp3}", #{
measurement => "m3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${timestamp3}"
}},
{"m4 field=field4", #{
measurement => "m4",
tags => [],
fields => [{"field", "field4"}],
timestamp => undefined
}},
{"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}",
#{
measurement => "m5",
tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
timestamp => "${timestamp5}"
}},
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}},
{"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"",
#{
measurement => "m7",
tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
timestamp => undefined
}},
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
#{
measurement => "m8",
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
timestamp => "${timestamp8}"
}},
{"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
#{
measurement => "m9",
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
timestamp => "${timestamp9}"
}},
{"m10 field=\"\" ${timestamp10}", #{
measurement => "m10",
tags => [],
fields => [{"field", ""}],
timestamp => "${timestamp10}"
}}
]).
-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [
{"\n m1,tag=tag1 field=field1 ${timestamp1} \n", #{
measurement => "m1",
tags => [{"tag", "tag1"}],
fields => [{"field", "field1"}],
timestamp => "${timestamp1}"
}},
{" m2,tag=tag2 field=field2 ", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field2"}],
timestamp => undefined
}},
{" m3 field=field3 ${timestamp3} ", #{
measurement => "m3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${timestamp3}"
}},
{" \n m4 field=field4\n ", #{
measurement => "m4",
tags => [],
fields => [{"field", "field4"}],
timestamp => undefined
}},
{" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5} \n",
#{
measurement => "m5",
tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
timestamp => "${timestamp5}"
}},
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b\n ", #{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}}
]).
-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [
{"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{
measurement => "m =1,",
tags => [{",tag =", "=tag 1,"}],
fields => [{",fie ld ", " field,1"}],
timestamp => "${timestamp1}"
}},
{"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field \"2\",\n"}],
timestamp => undefined
}},
{"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
measurement => "m 3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${payload.timestamp 3}"
}},
{"m4 field=\"\\\"field\\\\4\\\"\"", #{
measurement => "m4",
tags => [],
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{
"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
fields => [
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}
},
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
#{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}},
{
"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
"field_a=field7a,field_b=\"field7b\\\\\n\"",
#{
measurement => " m7 ",
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
timestamp => undefined
}
},
{
"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
"field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
#{
measurement => "m8",
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
timestamp => "${timestamp8}"
}
},
{"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
#{
measurement => "m\\9",
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
timestamp => "${timestamp9}"
}},
{"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
measurement => "m,10",
tags => [],
%% backslash should not be un-escaped in tag key
fields => [{"\"field\\\\\"", ""}],
timestamp => "${timestamp10}"
}}
]).
-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [
{" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1} ", #{
measurement => "m =1,",
tags => [{",tag =", "=tag 1,"}],
fields => [{",fie ld ", " field,1"}],
timestamp => "${timestamp1}"
}},
{" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field \"2\",\n"}],
timestamp => undefined
}},
{" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{
measurement => "m 3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${payload.timestamp 3}"
}},
{" m4 field=\"\\\"field\\\\4\\\"\" ", #{
measurement => "m4",
tags => [],
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{
" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
fields => [
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}
},
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ",
#{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}}
]).
invalid_write_syntax_line_test_() ->
[?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES].
invalid_write_syntax_multiline_test_() ->
LinesList = [
join("\n", ?INVALID_LINES),
join("\n\n\n", ?INVALID_LINES),
join("\n\n", lists:reverse(?INVALID_LINES))
],
[?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList].
valid_write_syntax_test_() ->
test_pairs(?VALID_LINE_PARSED_PAIRS).
valid_write_syntax_with_extra_spaces_test_() ->
test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS).
valid_write_syntax_escaped_chars_test_() ->
test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS).
valid_write_syntax_escaped_chars_with_extra_spaces_test_() ->
test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS).
test_pairs(PairsList) ->
{Lines, AllExpected} = lists:unzip(PairsList),
JoinedLines = join("\n", Lines),
JoinedLines1 = join("\n\n\n", Lines),
JoinedLines2 = join("\n\n", lists:reverse(Lines)),
SingleLineTests =
[
?_assertEqual([Expected], to_influx_lines(Line))
|| {Line, Expected} <- PairsList
],
JoinedLinesTests =
[
?_assertEqual(AllExpected, to_influx_lines(JoinedLines)),
?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)),
?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2))
],
SingleLineTests ++ JoinedLinesTests.
join(Sep, LinesList) ->
lists:flatten(lists:join(Sep, LinesList)).
to_influx_lines(RawLines) ->
OldLevel = emqx_logger:get_primary_log_level(),
try
%% mute error logs from this call
emqx_logger:set_primary_log_level(none),
emqx_bridge_greptimedb:to_influx_lines(RawLines)
after
emqx_logger:set_primary_log_level(OldLevel)
end.