diff --git a/apps/emqx_bridge_greptimedb/rebar.config b/apps/emqx_bridge_greptimedb/rebar.config index cbde4660f..952281286 100644 --- a/apps/emqx_bridge_greptimedb/rebar.config +++ b/apps/emqx_bridge_greptimedb/rebar.config @@ -3,11 +3,10 @@ ]}. {deps, [ - {emqx, {path, "../../apps/emqx"}}, - {emqx_connector, {path, "../../apps/emqx_connector"}}, - {emqx_resource, {path, "../../apps/emqx_resource"}}, - {emqx_bridge, {path, "../../apps/emqx_bridge"}}, - {greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.1"}}} + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}}, + {greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.1"}}} ]}. {plugins, [rebar3_path_deps]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl index ffb0e39c7..f37ddf320 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl @@ -1,3 +1,298 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- -module(emqx_bridge_greptimedb). --export([]). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-type write_syntax() :: list(). +-reflect_type([write_syntax/0]). +-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). +-export([to_influx_lines/1]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"greptimedb_grpc_v1">> => #{ + summary => <<"Greptimedb HTTP API V2 Bridge">>, + value => values("greptimedb_grpc_v1", Method) + } + } + ]. + +values(Protocol, get) -> + values(Protocol, post); +values("greptimedb_grpc_v1", post) -> + SupportUint = <<"uint_value=${payload.uint_key}u,">>, + TypeOpts = #{ + bucket => <<"example_bucket">>, + org => <<"examlpe_org">>, + token => <<"example_token">>, + server => <<"127.0.0.1:4000">> + }, + values(common, "greptimedb_grpc_v1", SupportUint, TypeOpts); +values(Protocol, put) -> + values(Protocol, post). + +values(common, Protocol, SupportUint, TypeOpts) -> + CommonConfigs = #{ + type => list_to_atom(Protocol), + name => <<"demo">>, + enable => true, + local_topic => <<"local/topic/#">>, + write_syntax => + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, + "bool=${payload.bool}">>, + precision => ms, + resource_opts => #{ + batch_size => 100, + batch_time => <<"20ms">> + }, + server => <<"127.0.0.1:4000">>, + ssl => #{enable => false} + }, + maps:merge(TypeOpts, CommonConfigs). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_greptimedb". + +roots() -> []. + +fields("post_grpc_v1") -> + method_fields(post, greptimedb_grpc_v1); +fields("put_grpc_v1") -> + method_fields(put, greptimedb_grpc_v1); +fields("get_grpc_v1") -> + method_fields(get, greptimedb_grpc_v1); +fields(Type) when + Type == greptimedb_grpc_v1 +-> + greptimedb_bridge_common_fields() ++ + connector_fields(Type). + +method_fields(post, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType); +method_fields(get, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType) ++ + emqx_bridge_schema:status_fields(); +method_fields(put, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType). + +greptimedb_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {write_syntax, fun write_syntax/1} + ] ++ + emqx_resource_schema:fields("resource_opts"). + +connector_fields(Type) -> + emqx_bridge_greptimedb_connector:fields(Type). + +type_name_fields(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Greptimedb using `", string:to_upper(Method), "` method."]; +desc(greptimedb_grpc_v1) -> + ?DESC(emqx_bridge_greptimedb_connector, "greptimedb_grpc_v1"); +desc(_) -> + undefined. + +write_syntax(type) -> + ?MODULE:write_syntax(); +write_syntax(required) -> + true; +write_syntax(validator) -> + [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; +write_syntax(converter) -> + fun to_influx_lines/1; +write_syntax(desc) -> + ?DESC("write_syntax"); +write_syntax(format) -> + <<"sql">>; +write_syntax(_) -> + undefined. + +to_influx_lines(RawLines) -> + try + influx_lines(str(RawLines), []) + catch + _:Reason:Stacktrace -> + Msg = lists:flatten( + io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines]) + ), + ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}), + throw(Msg) + end. + +-define(MEASUREMENT_ESC_CHARS, [$,, $\s]). +-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]). +-define(FIELD_VAL_ESC_CHARS, [$", $\\]). +% Common separator for both tags and fields +-define(SEP, $\s). +-define(MEASUREMENT_TAG_SEP, $,). +-define(KEY_SEP, $=). +-define(VAL_SEP, $,). +-define(NON_EMPTY, [_ | _]). + +influx_lines([] = _RawLines, Acc) -> + ?NON_EMPTY = lists:reverse(Acc); +influx_lines(RawLines, Acc) -> + {Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc), + influx_lines(RawLines1, Acc1). + +influx_line([], Acc) -> + {Acc, []}; +influx_line(Line, Acc) -> + {?NON_EMPTY = Measurement, Line1} = measurement(Line), + {Tags, Line2} = tags(Line1), + {?NON_EMPTY = Fields, Line3} = influx_fields(Line2), + {Timestamp, Line4} = timestamp(Line3), + { + [ + #{ + measurement => Measurement, + tags => Tags, + fields => Fields, + timestamp => Timestamp + } + | Acc + ], + Line4 + }. + +measurement(Line) -> + unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []). + +tags([?MEASUREMENT_TAG_SEP | Line]) -> + tags1(Line, []); +tags(Line) -> + {[], Line}. + +%% Empty line is invalid as fields are required after tags, +%% need to break recursion here and fail later on parsing fields +tags1([] = Line, Acc) -> + {lists:reverse(Acc), Line}; +%% Matching non empty Acc treats lines like "m, field=field_val" invalid +tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) -> + {lists:reverse(Acc), Line}; +tags1(Line, Acc) -> + {Tag, Line1} = tag(Line), + tags1(Line1, [Tag | Acc]). + +tag(Line) -> + {?NON_EMPTY = Key, Line1} = key(Line), + {?NON_EMPTY = Val, Line2} = tag_val(Line1), + {{Key, Val}, Line2}. + +tag_val(Line) -> + {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []), + {Val, strip_l(Line1, ?VAL_SEP)}. + +influx_fields([?SEP | Line]) -> + fields1(string:trim(Line, leading, "\s"), []). + +%% Timestamp is optional, so fields may be at the very end of the line +fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n -> + {lists:reverse(Acc), Line}; +fields1([] = Line, Acc) -> + {lists:reverse(Acc), Line}; +fields1(Line, Acc) -> + {Field, Line1} = field(Line), + fields1(Line1, [Field | Acc]). + +field(Line) -> + {?NON_EMPTY = Key, Line1} = key(Line), + {Val, Line2} = field_val(Line1), + {{Key, Val}, Line2}. + +field_val([$" | Line]) -> + {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []), + %% Quoted val can be empty + {Val, strip_l(Line1, ?VAL_SEP)}; +field_val(Line) -> + %% Unquoted value should not be un-escaped according to Greptimedb protocol, + %% as it can only hold float, integer, uinteger or boolean value. + %% However, as templates are possible, un-escaping is applied here, + %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}" + {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []), + {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}. + +timestamp([?SEP | Line]) -> + Line1 = string:trim(Line, leading, "\s"), + %% Similarly to unquoted field value, un-escape a timestamp to validate and handle + %% potentially escaped characters in a template + {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []), + {timestamp1(T), Line2}; +timestamp(Line) -> + {undefined, Line}. + +timestamp1(?NON_EMPTY = Ts) -> Ts; +timestamp1(_Ts) -> undefined. + +%% Common for both tag and field keys +key(Line) -> + {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []), + {Key, strip_l(Line1, ?KEY_SEP)}. + +%% Only strip a character between pairs, don't strip it(and let it fail) +%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val +strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP -> + [Ch1 | Str]; +strip_l(Str, _Ch) -> + Str. + +unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) -> + ShouldEscapeBackslash = lists:member($\\, EscapeChars), + Acc1 = + case lists:member(Char, EscapeChars) of + true -> [Char | Acc]; + false when not ShouldEscapeBackslash -> [Char, $\\ | Acc] + end, + unescape(EscapeChars, SepChars, T, Acc1); +unescape(EscapeChars, SepChars, [Char | T] = L, Acc) -> + IsEscapeChar = lists:member(Char, EscapeChars), + case lists:member(Char, SepChars) of + true -> {lists:reverse(Acc), L}; + false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc]) + end; +unescape(_EscapeChars, _SepChars, [] = L, Acc) -> + {lists:reverse(Acc), L}. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 17c4d9a3c..a02df09c5 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -1,84 +1,179 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- -module(emqx_bridge_greptimedb_connector). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -%% `emqx_resource' API +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource -export([ callback_mode/0, on_start/2, on_stop/2, - on_get_status/2, on_query/3, - on_query_async/4, on_batch_query/3, - on_batch_query_async/4 + on_get_status/2 ]). --define(GREPTIMEDB_DEFAULT_PORT, 4001). +-export([ + roots/0, + namespace/0, + fields/1, + desc/1 +]). + +%% only for test +-export([is_unrecoverable_error/1]). + +-type ts_precision() :: ns | us | ms | s. + +%% Allocatable resources +-define(greptime_client, greptime_client). + +-define(GREPTIMEDB_DEFAULT_PORT, 4000). + +-define(DEFAULT_DB, <<"public">>). -define(GREPTIMEDB_HOST_OPTIONS, #{ default_port => ?GREPTIMEDB_DEFAULT_PORT }). -%%------------------------------------------------------------------------------------- -%% `emqx_resource' API -%%------------------------------------------------------------------------------------- -callback_mode() -> async_if_possible. +-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). + +%% ------------------------------------------------------------------------------------------------- +%% resource callback +callback_mode() -> always_sync. on_start(InstId, Config) -> + %% InstID as pool would be handled by greptimedb client + %% so there is no need to allocate pool_name here + %% See: greptimedb:start_client/1 start_client(InstId, Config). -on_stop(_InstId, #{client := Client}) -> - greptimedb:stop_client(Client). +on_stop(InstId, _State) -> + case emqx_resource:get_allocated_resources(InstId) of + #{?greptime_client := Client} -> + greptimedb:stop_client(Client); + _ -> + ok + end. -on_get_status(_InstId, _State) -> - %% FIXME - connected. +on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => false, mode => sync} + ), + do_query(InstId, Client, Points); + {error, ErrorPoints} -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => false, mode => sync, error => ErrorPoints} + ), + log_error_points(InstId, ErrorPoints), + {error, {unrecoverable_error, ErrorPoints}} + end. -on_query(_InstanceId, {send_message, _Message}, _State) -> - todo. +%% Once a Batched Data trans to points failed. +%% This batch query failed +on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => true, mode => sync} + ), + do_query(InstId, Client, Points); + {error, Reason} -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => true, mode => sync, error => Reason} + ), + {error, {unrecoverable_error, Reason}} + end. -on_query_async(_InstanceId, {send_message, _Message}, _ReplyFunAndArgs0, _State) -> - todo. +on_get_status(_InstId, #{client := Client}) -> + case greptimedb:is_alive(Client) of + true -> + connected; + false -> + disconnected + end. -on_batch_query( - _ResourceID, - _BatchReq, - _State -) -> - todo. +%% ------------------------------------------------------------------------------------------------- +%% schema +namespace() -> connector_greptimedb. -on_batch_query_async( - _InstId, - _BatchData, - {_ReplyFun, _Args}, - _State -) -> - todo. +roots() -> + [ + {config, #{ + type => hoconsc:union( + [ + hoconsc:ref(?MODULE, greptimedb_grpc_v1) + ] + ) + }} + ]. +fields(common) -> + [ + {server, server()}, + {precision, + %% The greptimedb only supports these 4 precision: + %% See "https://github.com/influxdata/greptimedb/blob/ + %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for + %% more information. + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })} + ]; +fields(greptimedb_grpc_v1) -> + fields(common) ++ + [ + {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})} + ] ++ emqx_connector_schema_lib:ssl_fields(). + +server() -> + Meta = #{ + required => false, + default => <<"127.0.0.1:4000">>, + desc => ?DESC("server"), + converter => fun convert_server/2 + }, + emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS). + +desc(common) -> + ?DESC("common"); +desc(greptimedb_grpc_v1) -> + ?DESC("greptimedb_grpc_v1"). + +%% ------------------------------------------------------------------------------------------------- %% internal functions start_client(InstId, Config) -> ClientConfig = client_config(InstId, Config), ?SLOG(info, #{ - msg => "starting GreptimeDB connector", + msg => "starting greptimedb connector", connector => InstId, config => emqx_utils:redact(Config), client_config => emqx_utils:redact(ClientConfig) }), - try - case greptimedb:start_client(ClientConfig) of - {ok, Client} -> - {ok, #{client => Client}}; - {error, Reason} -> - ?tp(greptimedb_connector_start_failed, #{error => Reason}), - ?SLOG(warning, #{ - msg => "failed_to_start_greptimedb_connector", - connector => InstId, - reason => Reason - }), - {error, Reason} - end + try do_start_client(InstId, ClientConfig, Config) of + Res = {ok, #{client := Client}} -> + ok = emqx_resource:allocate_resource(InstId, ?greptime_client, Client), + Res; + {error, Reason} -> + {error, Reason} catch E:R:S -> ?tp(greptimedb_connector_start_exception, #{error => {E, R}}), @@ -92,9 +187,64 @@ start_client(InstId, Config) -> {error, R} end. +do_start_client( + InstId, + ClientConfig, + Config = #{write_syntax := Lines} +) -> + Precision = maps:get(precision, Config, ms), + case greptimedb:start_client(ClientConfig) of + {ok, Client} -> + case greptimedb:is_alive(Client, true) of + true -> + State = #{ + client => Client, + dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB), + write_syntax => to_config(Lines, Precision) + }, + ?SLOG(info, #{ + msg => "starting greptimedb connector success", + connector => InstId, + client => redact_auth(Client), + state => redact_auth(State) + }), + {ok, State}; + {false, Reason} -> + ?tp(greptimedb_connector_start_failed, #{ + error => greptimedb_client_not_alive, reason => Reason + }), + ?SLOG(warning, #{ + msg => "failed_to_start_greptimedb_connector", + connector => InstId, + client => redact_auth(Client), + reason => Reason + }), + %% no leak + _ = greptimedb:stop_client(Client), + {error, greptimedb_client_not_alive} + end; + {error, {already_started, Client0}} -> + ?tp(greptimedb_connector_start_already_started, #{}), + ?SLOG(info, #{ + msg => "restarting greptimedb connector, found already started client", + connector => InstId, + old_client => redact_auth(Client0) + }), + _ = greptimedb:stop_client(Client0), + do_start_client(InstId, ClientConfig, Config); + {error, Reason} -> + ?tp(greptimedb_connector_start_failed, #{error => Reason}), + ?SLOG(warning, #{ + msg => "failed_to_start_greptimedb_connector", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. + client_config( InstId, - _Config = #{ + Config = #{ server := Server } ) -> @@ -103,12 +253,369 @@ client_config( {endpoints, [{http, str(Host), Port}]}, {pool_size, erlang:system_info(schedulers)}, {pool, InstId}, - {pool_type, random} + {pool_type, random}, + {timeunit, maps:get(precision, Config, ms)} + ] ++ protocol_config(Config). + +protocol_config( + #{ + dbname := DbName, + ssl := SSL + } = Config +) -> + [ + {dbname, DbName} + ] ++ auth(Config) ++ + ssl_config(SSL). + +ssl_config(#{enable := false}) -> + [ + {https_enabled, false} + ]; +ssl_config(SSL = #{enable := true}) -> + [ + {https_enabled, true}, + {transport, ssl}, + {transport_opts, emqx_tls_lib:to_client_opts(SSL)} ]. +auth(#{username := Username, password := Password}) -> + [ + {auth, {basic, #{username => Username, password => Password}}} + ]; +auth(_) -> + []. + +redact_auth(Term) -> + emqx_utils:redact(Term, fun is_auth_key/1). + +is_auth_key(Key) when is_binary(Key) -> + string:equal("authorization", Key, true); +is_auth_key(_) -> + false. + +%% ------------------------------------------------------------------------------------------------- +%% Query +do_query(InstId, Client, Points) -> + case greptimedb:write_batch(Client, Points) of + {ok, _} -> + ?SLOG(debug, #{ + msg => "greptimedb write point success", + connector => InstId, + points => Points + }); + {error, {401, _, _}} -> + ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}), + ?SLOG(error, #{ + msg => "greptimedb_authorization_failed", + client => redact_auth(Client), + connector => InstId + }), + {error, {unrecoverable_error, <<"authorization failure">>}}; + {error, Reason} = Err -> + ?tp(greptimedb_connector_do_query_failure, #{error => Reason}), + ?SLOG(error, #{ + msg => "greptimedb write point failed", + connector => InstId, + reason => Reason + }), + case is_unrecoverable_error(Err) of + true -> + {error, {unrecoverable_error, Reason}}; + false -> + {error, {recoverable_error, Reason}} + end + end. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Config Trans + +to_config(Lines, Precision) -> + to_config(Lines, [], Precision). + +to_config([], Acc, _Precision) -> + lists:reverse(Acc); +to_config([Item0 | Rest], Acc, Precision) -> + Ts0 = maps:get(timestamp, Item0, undefined), + {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), + Item = #{ + measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)), + timestamp => Ts, + precision => {FromPrecision, ToPrecision}, + tags => to_kv_config(maps:get(tags, Item0)), + fields => to_kv_config(maps:get(fields, Item0)) + }, + to_config(Rest, [Item | Acc], Precision). + +%% pre-process the timestamp template +%% returns a tuple of three elements: +%% 1. The timestamp template itself. +%% 2. The source timestamp precision (ms if the template ${timestamp} is used). +%% 3. The target timestamp precision (configured for the client). +preproc_tmpl_timestamp(undefined, Precision) -> + %% not configured, we default it to the message timestamp + preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision); +preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> + %% a const value is used which is very much unusual, but we have to add a special handling + {Ts, Precision, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> + preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); +preproc_tmpl_timestamp(<> = Ts, Precision) -> + {emqx_placeholder:preproc_tmpl(Ts), ms, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> + %% a placehold is in use. e.g. ${payload.my_timestamp} + %% we can only hope it the value will be of the same precision in the configs + {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}. + +to_kv_config(KVfields) -> + maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). + +to_maps_config(K, V, Res) -> + NK = emqx_placeholder:preproc_tmpl(bin(K)), + NV = emqx_placeholder:preproc_tmpl(bin(V)), + Res#{NK => NV}. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Data Trans +parse_batch_data(InstId, BatchData, SyntaxLines) -> + {Points, Errors} = lists:foldl( + fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + {[Points | ListOfPoints], ErrAccIn}; + {error, ErrorPoints} -> + log_error_points(InstId, ErrorPoints), + {ListOfPoints, ErrAccIn + 1} + end + end, + {[], 0}, + BatchData + ), + case Errors of + 0 -> + {ok, lists:flatten(Points)}; + _ -> + ?SLOG(error, #{ + msg => io_lib:format("Greptimedb trans point failed, count: ~p", [Errors]), + connector => InstId, + reason => points_trans_failed + }), + {error, points_trans_failed} + end. + +-spec data_to_points(map(), [ + #{ + fields := [{binary(), binary()}], + measurement := binary(), + tags := [{binary(), binary()}], + timestamp := emqx_placeholder:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} + } +]) -> {ok, [map()]} | {error, term()}. +data_to_points(Data, SyntaxLines) -> + lines_to_points(Data, SyntaxLines, [], []). + +%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated. +%% And once a row fails to convert, all of them are considered to have failed. +lines_to_points(_, [], Points, ErrorPoints) -> + case ErrorPoints of + [] -> + {ok, Points}; + _ -> + %% ignore trans succeeded points + {error, ErrorPoints} + end; +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_list(Ts) +-> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of + {ok, TsInt} -> + Item1 = Item#{timestamp => TsInt}, + continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); + {error, BadTs} -> + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, {bad_timestamp, BadTs}} | ErrorPointsAcc + ]) + end; +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_integer(Ts) +-> + continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). + +parse_timestamp([TsInt]) when is_integer(TsInt) -> + {ok, TsInt}; +parse_timestamp([TsBin]) -> + try + {ok, binary_to_integer(TsBin)} + catch + _:_ -> + {error, TsBin} + end. + +continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> + case line_to_point(Data, Item) of + #{fields := Fields} when map_size(Fields) =:= 0 -> + %% greptimedb client doesn't like empty field maps... + ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc], + lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1); + Point -> + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) + end. + +line_to_point( + Data, + #{ + measurement := Measurement, + tags := Tags, + fields := Fields, + timestamp := Ts, + precision := Precision + } = Item +) -> + {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + TableName = emqx_placeholder:proc_tmpl(Measurement, Data), + {TableName, [ + maps:without([precision], Item#{ + tags => EncodedTags, + fields => EncodedFields, + timestamp => maybe_convert_time_unit(Ts, Precision) + }) + ]}. + +maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) -> + erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)). + +time_unit(s) -> second; +time_unit(ms) -> millisecond; +time_unit(us) -> microsecond; +time_unit(ns) -> nanosecond. + +maps_config_to_data(K, V, {Data, Res}) -> + KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, + VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), + case {NK0, NV} of + {[undefined], _} -> + {Data, Res}; + %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] + {_, [undefined | _]} -> + {Data, Res}; + _ -> + NK = list_to_binary(NK0), + {Data, Res#{NK => value_type(NV)}} + end. + +value_type([Int, <<"i">>]) when + is_integer(Int) +-> + greptimedb_values:int64_value(Int); +value_type([UInt, <<"u">>]) when + is_integer(UInt) +-> + greptimedb_values:uint64_value(UInt); +value_type([Float]) when is_float(Float) -> + Float; +value_type([<<"t">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"T">>]) -> + greptimedb_values:boolean_value(true); +value_type([true]) -> + greptimedb_values:boolean_value(true); +value_type([<<"TRUE">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"True">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"f">>]) -> + greptimedb_values:boolean_value(false); +value_type([<<"F">>]) -> + greptimedb_values:boolean_value(false); +value_type([false]) -> + greptimedb_values:boolean_value(false); +value_type([<<"FALSE">>]) -> + greptimedb_values:boolean_value(false); +value_type([<<"False">>]) -> + greptimedb_values:boolean_value(false); +value_type(Val) -> + #{values => #{string_values => Val, datatype => 'STRING'}}. + +key_filter(undefined) -> undefined; +key_filter(Value) -> emqx_utils_conv:bin(Value). + +data_filter(undefined) -> undefined; +data_filter(Int) when is_integer(Int) -> Int; +data_filter(Number) when is_number(Number) -> Number; +data_filter(Bool) when is_boolean(Bool) -> Bool; +data_filter(Data) -> bin(Data). + +bin(Data) -> emqx_utils_conv:bin(Data). + +%% helper funcs +log_error_points(InstId, Errs) -> + lists:foreach( + fun({error, Reason}) -> + ?SLOG(error, #{ + msg => "greptimedb trans point failed", + connector => InstId, + reason => Reason + }) + end, + Errs + ). + +convert_server(<<"http://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(<<"https://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(Server, HoconOpts) -> + emqx_schema:convert_servers(Server, HoconOpts). + str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) -> binary_to_list(B); str(S) when is_list(S) -> S. + +is_unrecoverable_error({error, {unrecoverable_error, _}}) -> + true; +is_unrecoverable_error(_) -> + false. + +%%=================================================================== +%% eunit tests +%%=================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +is_auth_key_test_() -> + [ + ?_assert(is_auth_key(<<"Authorization">>)), + ?_assertNot(is_auth_key(<<"Something">>)), + ?_assertNot(is_auth_key(89)) + ]. + +%% for coverage +desc_test_() -> + [ + ?_assertMatch( + {desc, _, _}, + desc(common) + ), + ?_assertMatch( + {desc, _, _}, + desc(greptimedb_grpc_v1) + ), + ?_assertMatch( + {desc, _, _}, + hocon_schema:field_schema(server(), desc) + ), + ?_assertMatch( + connector_greptimedb, + namespace() + ) + ]. +-endif. diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl new file mode 100644 index 000000000..a07ccd92d --- /dev/null +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl @@ -0,0 +1,348 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(INVALID_LINES, [ + " ", + " \n", + " \n\n\n ", + "\n", + " \n\n \n \n", + "measurement", + "measurement ", + "measurement,tag", + "measurement field", + "measurement,tag field", + "measurement,tag field ${timestamp}", + "measurement,tag=", + "measurement,tag=tag1", + "measurement,tag =", + "measurement field=", + "measurement field= ", + "measurement field = ", + "measurement, tag = field = ", + "measurement, tag = field = ", + "measurement, tag = tag_val field = field_val", + "measurement, tag = tag_val field = field_val ${timestamp}", + "measurement,= = ${timestamp}", + "measurement,t=a, f=a, ${timestamp}", + "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}", + "measurement,t=a,t1=b, f=a,f1=b,", + "measurement,t=a, t1=b, f=a,f1=b,", + "measurement,t=a,,t1=b, f=a,f1=b,", + "measurement,t=a,,t1=b f=a,,f1=b", + "measurement,t=a,,t1=b f=a,f1=b ${timestamp}", + "measurement, f=a,f1=b", + "measurement, f=a,f1=b ${timestamp}", + "measurement,, f=a,f1=b ${timestamp}", + "measurement,, f=a,f1=b", + "measurement,, f=a,f1=b,, ${timestamp}", + "measurement f=a,f1=b,, ${timestamp}", + "measurement,t=a f=a,f1=b,, ${timestamp}", + "measurement,t=a f=a,f1=b,, ", + "measurement,t=a f=a,f1=b,,", + "measurement, t=a f=a,f1=b", + "measurement,t=a f=a, f1=b", + "measurement,t=a f=a, f1=b ${timestamp}", + "measurement, t=a f=a, f1=b ${timestamp}", + "measurement,t= a f=a,f1=b ${timestamp}", + "measurement,t= a f=a,f1 =b ${timestamp}", + "measurement, t = a f = a,f1 = b ${timestamp}", + "measurement,t=a f=a,f1=b \n ${timestamp}", + "measurement,t=a \n f=a,f1=b \n ${timestamp}", + "measurement,t=a \n f=a,f1=b \n ", + "\n measurement,t=a \n f=a,f1=b \n ${timestamp}", + "\n measurement,t=a \n f=a,f1=b \n", + %% not escaped backslash in a quoted field value is invalid + "measurement,tag=1 field=\"val\\1\"" +]). + +-define(VALID_LINE_PARSED_PAIRS, [ + {"m1,tag=tag1 field=field1 ${timestamp1}", #{ + measurement => "m1", + tags => [{"tag", "tag1"}], + fields => [{"field", "field1"}], + timestamp => "${timestamp1}" + }}, + {"m2,tag=tag2 field=field2", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field2"}], + timestamp => undefined + }}, + {"m3 field=field3 ${timestamp3}", #{ + measurement => "m3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${timestamp3}" + }}, + {"m4 field=field4", #{ + measurement => "m4", + tags => [], + fields => [{"field", "field4"}], + timestamp => undefined + }}, + {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}", + #{ + measurement => "m5", + tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], + fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], + timestamp => "${timestamp5}" + }}, + {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }}, + {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"", + #{ + measurement => "m7", + tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}], + fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}], + timestamp => undefined + }}, + {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}", + #{ + measurement => "m8", + tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], + fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}], + timestamp => "${timestamp8}" + }}, + {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", + #{ + measurement => "m9", + tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], + fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + timestamp => "${timestamp9}" + }}, + {"m10 field=\"\" ${timestamp10}", #{ + measurement => "m10", + tags => [], + fields => [{"field", ""}], + timestamp => "${timestamp10}" + }} +]). + +-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [ + {"\n m1,tag=tag1 field=field1 ${timestamp1} \n", #{ + measurement => "m1", + tags => [{"tag", "tag1"}], + fields => [{"field", "field1"}], + timestamp => "${timestamp1}" + }}, + {" m2,tag=tag2 field=field2 ", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field2"}], + timestamp => undefined + }}, + {" m3 field=field3 ${timestamp3} ", #{ + measurement => "m3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${timestamp3}" + }}, + {" \n m4 field=field4\n ", #{ + measurement => "m4", + tags => [], + fields => [{"field", "field4"}], + timestamp => undefined + }}, + {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5} \n", + #{ + measurement => "m5", + tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], + fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], + timestamp => "${timestamp5}" + }}, + {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b\n ", #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }} +]). + +-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [ + {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{ + measurement => "m =1,", + tags => [{",tag =", "=tag 1,"}], + fields => [{",fie ld ", " field,1"}], + timestamp => "${timestamp1}" + }}, + {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field \"2\",\n"}], + timestamp => undefined + }}, + {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{ + measurement => "m 3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${payload.timestamp 3}" + }}, + {"m4 field=\"\\\"field\\\\4\\\"\"", #{ + measurement => "m4", + tags => [], + fields => [{"field", "\"field\\4\""}], + timestamp => undefined + }}, + { + "m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," + "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}", + #{ + measurement => "m5,mA", + tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], + fields => [ + {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} + ], + timestamp => "${timestamp5}" + } + }, + {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"", + #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }}, + { + "\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\"," + "field_a=field7a,field_b=\"field7b\\\\\n\"", + #{ + measurement => " m7 ", + tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}], + fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}], + timestamp => undefined + } + }, + { + "m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a," + "field_b=\"\\\"field\\\" = 8b\" ${timestamp8}", + #{ + measurement => "m8", + tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], + fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}], + timestamp => "${timestamp8}" + } + }, + {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", + #{ + measurement => "m\\9", + tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], + fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + timestamp => "${timestamp9}" + }}, + {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{ + measurement => "m,10", + tags => [], + %% backslash should not be un-escaped in tag key + fields => [{"\"field\\\\\"", ""}], + timestamp => "${timestamp10}" + }} +]). + +-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [ + {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1} ", #{ + measurement => "m =1,", + tags => [{",tag =", "=tag 1,"}], + fields => [{",fie ld ", " field,1"}], + timestamp => "${timestamp1}" + }}, + {" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field \"2\",\n"}], + timestamp => undefined + }}, + {" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{ + measurement => "m 3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${payload.timestamp 3}" + }}, + {" m4 field=\"\\\"field\\\\4\\\"\" ", #{ + measurement => "m4", + tags => [], + fields => [{"field", "\"field\\4\""}], + timestamp => undefined + }}, + { + " m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," + "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ", + #{ + measurement => "m5,mA", + tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], + fields => [ + {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} + ], + timestamp => "${timestamp5}" + } + }, + {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ", + #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }} +]). + +invalid_write_syntax_line_test_() -> + [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES]. + +invalid_write_syntax_multiline_test_() -> + LinesList = [ + join("\n", ?INVALID_LINES), + join("\n\n\n", ?INVALID_LINES), + join("\n\n", lists:reverse(?INVALID_LINES)) + ], + [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList]. + +valid_write_syntax_test_() -> + test_pairs(?VALID_LINE_PARSED_PAIRS). + +valid_write_syntax_with_extra_spaces_test_() -> + test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS). + +valid_write_syntax_escaped_chars_test_() -> + test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS). + +valid_write_syntax_escaped_chars_with_extra_spaces_test_() -> + test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS). + +test_pairs(PairsList) -> + {Lines, AllExpected} = lists:unzip(PairsList), + JoinedLines = join("\n", Lines), + JoinedLines1 = join("\n\n\n", Lines), + JoinedLines2 = join("\n\n", lists:reverse(Lines)), + SingleLineTests = + [ + ?_assertEqual([Expected], to_influx_lines(Line)) + || {Line, Expected} <- PairsList + ], + JoinedLinesTests = + [ + ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)), + ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)), + ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2)) + ], + SingleLineTests ++ JoinedLinesTests. + +join(Sep, LinesList) -> + lists:flatten(lists:join(Sep, LinesList)). + +to_influx_lines(RawLines) -> + OldLevel = emqx_logger:get_primary_log_level(), + try + %% mute error logs from this call + emqx_logger:set_primary_log_level(none), + emqx_bridge_greptimedb:to_influx_lines(RawLines) + after + emqx_logger:set_primary_log_level(OldLevel) + end.