fix(greptimedb_bridge): avoid double-parsing write syntax during probe and remove code duplication

Fixes https://emqx.atlassian.net/browse/EMQX-10843
This commit is contained in:
Thales Macedo Garitezi 2023-08-22 10:16:03 -03:00
parent 7b8a599d17
commit 45e2e687e5
7 changed files with 102 additions and 510 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -21,11 +21,6 @@
desc/1
]).
-type write_syntax() :: list().
-reflect_type([write_syntax/0]).
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
-export([to_influx_lines/1]).
%% -------------------------------------------------------------------------------------------------
%% api
@ -131,169 +126,16 @@ desc(_) ->
undefined.
write_syntax(type) ->
?MODULE:write_syntax();
emqx_bridge_influxdb:write_syntax();
write_syntax(required) ->
true;
write_syntax(validator) ->
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
write_syntax(converter) ->
fun to_influx_lines/1;
fun emqx_bridge_influxdb:to_influx_lines/1;
write_syntax(desc) ->
?DESC("write_syntax");
write_syntax(format) ->
<<"sql">>;
write_syntax(_) ->
undefined.
to_influx_lines(RawLines) ->
try
influx_lines(str(RawLines), [])
catch
_:Reason:Stacktrace ->
Msg = lists:flatten(
io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines])
),
?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
throw(Msg)
end.
-define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
-define(FIELD_VAL_ESC_CHARS, [$", $\\]).
% Common separator for both tags and fields
-define(SEP, $\s).
-define(MEASUREMENT_TAG_SEP, $,).
-define(KEY_SEP, $=).
-define(VAL_SEP, $,).
-define(NON_EMPTY, [_ | _]).
influx_lines([] = _RawLines, Acc) ->
?NON_EMPTY = lists:reverse(Acc);
influx_lines(RawLines, Acc) ->
{Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc),
influx_lines(RawLines1, Acc1).
influx_line([], Acc) ->
{Acc, []};
influx_line(Line, Acc) ->
{?NON_EMPTY = Measurement, Line1} = measurement(Line),
{Tags, Line2} = tags(Line1),
{?NON_EMPTY = Fields, Line3} = influx_fields(Line2),
{Timestamp, Line4} = timestamp(Line3),
{
[
#{
measurement => Measurement,
tags => Tags,
fields => Fields,
timestamp => Timestamp
}
| Acc
],
Line4
}.
measurement(Line) ->
unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).
tags([?MEASUREMENT_TAG_SEP | Line]) ->
tags1(Line, []);
tags(Line) ->
{[], Line}.
%% Empty line is invalid as fields are required after tags,
%% need to break recursion here and fail later on parsing fields
tags1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
%% Matching non empty Acc treats lines like "m, field=field_val" invalid
tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
{lists:reverse(Acc), Line};
tags1(Line, Acc) ->
{Tag, Line1} = tag(Line),
tags1(Line1, [Tag | Acc]).
tag(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{?NON_EMPTY = Val, Line2} = tag_val(Line1),
{{Key, Val}, Line2}.
tag_val(Line) ->
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
{Val, strip_l(Line1, ?VAL_SEP)}.
influx_fields([?SEP | Line]) ->
fields1(string:trim(Line, leading, "\s"), []).
%% Timestamp is optional, so fields may be at the very end of the line
fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
{lists:reverse(Acc), Line};
fields1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
fields1(Line, Acc) ->
{Field, Line1} = field(Line),
fields1(Line1, [Field | Acc]).
field(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{Val, Line2} = field_val(Line1),
{{Key, Val}, Line2}.
field_val([$" | Line]) ->
{Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
%% Quoted val can be empty
{Val, strip_l(Line1, ?VAL_SEP)};
field_val(Line) ->
%% Unquoted value should not be un-escaped according to Greptimedb protocol,
%% as it can only hold float, integer, uinteger or boolean value.
%% However, as templates are possible, un-escaping is applied here,
%% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
{?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.
timestamp([?SEP | Line]) ->
Line1 = string:trim(Line, leading, "\s"),
%% Similarly to unquoted field value, un-escape a timestamp to validate and handle
%% potentially escaped characters in a template
{T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
{timestamp1(T), Line2};
timestamp(Line) ->
{undefined, Line}.
timestamp1(?NON_EMPTY = Ts) -> Ts;
timestamp1(_Ts) -> undefined.
%% Common for both tag and field keys
key(Line) ->
{Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
{Key, strip_l(Line1, ?KEY_SEP)}.
%% Only strip a character between pairs, don't strip it(and let it fail)
%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
[Ch1 | Str];
strip_l(Str, _Ch) ->
Str.
unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
ShouldEscapeBackslash = lists:member($\\, EscapeChars),
Acc1 =
case lists:member(Char, EscapeChars) of
true -> [Char | Acc];
false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
end,
unescape(EscapeChars, SepChars, T, Acc1);
unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
IsEscapeChar = lists:member(Char, EscapeChars),
case lists:member(Char, SepChars) of
true -> {lists:reverse(Acc), L};
false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
end;
unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
{lists:reverse(Acc), L}.
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.

View File

@ -68,7 +68,9 @@ on_start(InstId, Config) ->
on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of
#{?greptime_client := Client} ->
greptimedb:stop_client(Client);
Res = greptimedb:stop_client(Client),
?tp(greptimedb_client_stopped, #{instance_id => InstId}),
Res;
_ ->
ok
end.

View File

@ -112,6 +112,9 @@ init_per_group(GreptimedbType, Config0) when
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{proxy_name, ProxyName},
{bridge_type, greptimedb},
{bridge_name, Name},
{bridge_config, GreptimedbConfig},
{greptimedb_host, GreptimedbHost},
{greptimedb_port, GreptimedbPort},
{greptimedb_http_port, GreptimedbHttpPort},
@ -457,6 +460,97 @@ t_start_ok(Config) ->
),
ok.
t_start_stop(Config) ->
%% we can't use this test case directly because `greptimedb_worker' apparently leaks
%% atoms...
%% ok = emqx_bridge_testlib:t_start_stop(Config, greptimedb_client_stopped),
BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config),
BridgeConfig = ?config(bridge_config, Config),
StopTracePoint = greptimedb_client_stopped,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
?check_trace(
begin
ProbeRes0 = emqx_bridge_testlib:probe_bridge_api(
BridgeType,
BridgeName,
BridgeConfig
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% `start` bridge to trigger `already_started`
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName),
#{?snk_kind := StopTracePoint},
5_000
)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Disable the bridge, which will also stop it.
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge:disable_enable(disable, BridgeType, BridgeName),
#{?snk_kind := StopTracePoint},
5_000
)
),
ok
end,
fun(Trace) ->
%% one for probe, two for real
?assertMatch(
[_, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
?of_kind(StopTracePoint, Trace)
),
ok
end
),
ok.
t_start_already_started(Config) ->
Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
Name = ?config(greptimedb_name, Config),

View File

@ -1,348 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb_tests).
-include_lib("eunit/include/eunit.hrl").
-define(INVALID_LINES, [
" ",
" \n",
" \n\n\n ",
"\n",
" \n\n \n \n",
"measurement",
"measurement ",
"measurement,tag",
"measurement field",
"measurement,tag field",
"measurement,tag field ${timestamp}",
"measurement,tag=",
"measurement,tag=tag1",
"measurement,tag =",
"measurement field=",
"measurement field= ",
"measurement field = ",
"measurement, tag = field = ",
"measurement, tag = field = ",
"measurement, tag = tag_val field = field_val",
"measurement, tag = tag_val field = field_val ${timestamp}",
"measurement,= = ${timestamp}",
"measurement,t=a, f=a, ${timestamp}",
"measurement,t=a,t1=b, f=a,f1=b, ${timestamp}",
"measurement,t=a,t1=b, f=a,f1=b,",
"measurement,t=a, t1=b, f=a,f1=b,",
"measurement,t=a,,t1=b, f=a,f1=b,",
"measurement,t=a,,t1=b f=a,,f1=b",
"measurement,t=a,,t1=b f=a,f1=b ${timestamp}",
"measurement, f=a,f1=b",
"measurement, f=a,f1=b ${timestamp}",
"measurement,, f=a,f1=b ${timestamp}",
"measurement,, f=a,f1=b",
"measurement,, f=a,f1=b,, ${timestamp}",
"measurement f=a,f1=b,, ${timestamp}",
"measurement,t=a f=a,f1=b,, ${timestamp}",
"measurement,t=a f=a,f1=b,, ",
"measurement,t=a f=a,f1=b,,",
"measurement, t=a f=a,f1=b",
"measurement,t=a f=a, f1=b",
"measurement,t=a f=a, f1=b ${timestamp}",
"measurement, t=a f=a, f1=b ${timestamp}",
"measurement,t= a f=a,f1=b ${timestamp}",
"measurement,t= a f=a,f1 =b ${timestamp}",
"measurement, t = a f = a,f1 = b ${timestamp}",
"measurement,t=a f=a,f1=b \n ${timestamp}",
"measurement,t=a \n f=a,f1=b \n ${timestamp}",
"measurement,t=a \n f=a,f1=b \n ",
"\n measurement,t=a \n f=a,f1=b \n ${timestamp}",
"\n measurement,t=a \n f=a,f1=b \n",
%% not escaped backslash in a quoted field value is invalid
"measurement,tag=1 field=\"val\\1\""
]).
-define(VALID_LINE_PARSED_PAIRS, [
{"m1,tag=tag1 field=field1 ${timestamp1}", #{
measurement => "m1",
tags => [{"tag", "tag1"}],
fields => [{"field", "field1"}],
timestamp => "${timestamp1}"
}},
{"m2,tag=tag2 field=field2", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field2"}],
timestamp => undefined
}},
{"m3 field=field3 ${timestamp3}", #{
measurement => "m3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${timestamp3}"
}},
{"m4 field=field4", #{
measurement => "m4",
tags => [],
fields => [{"field", "field4"}],
timestamp => undefined
}},
{"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}",
#{
measurement => "m5",
tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
timestamp => "${timestamp5}"
}},
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}},
{"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"",
#{
measurement => "m7",
tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
timestamp => undefined
}},
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
#{
measurement => "m8",
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
timestamp => "${timestamp8}"
}},
{"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
#{
measurement => "m9",
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
timestamp => "${timestamp9}"
}},
{"m10 field=\"\" ${timestamp10}", #{
measurement => "m10",
tags => [],
fields => [{"field", ""}],
timestamp => "${timestamp10}"
}}
]).
-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [
{"\n m1,tag=tag1 field=field1 ${timestamp1} \n", #{
measurement => "m1",
tags => [{"tag", "tag1"}],
fields => [{"field", "field1"}],
timestamp => "${timestamp1}"
}},
{" m2,tag=tag2 field=field2 ", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field2"}],
timestamp => undefined
}},
{" m3 field=field3 ${timestamp3} ", #{
measurement => "m3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${timestamp3}"
}},
{" \n m4 field=field4\n ", #{
measurement => "m4",
tags => [],
fields => [{"field", "field4"}],
timestamp => undefined
}},
{" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5} \n",
#{
measurement => "m5",
tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
timestamp => "${timestamp5}"
}},
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b\n ", #{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}}
]).
-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [
{"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{
measurement => "m =1,",
tags => [{",tag =", "=tag 1,"}],
fields => [{",fie ld ", " field,1"}],
timestamp => "${timestamp1}"
}},
{"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field \"2\",\n"}],
timestamp => undefined
}},
{"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
measurement => "m 3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${payload.timestamp 3}"
}},
{"m4 field=\"\\\"field\\\\4\\\"\"", #{
measurement => "m4",
tags => [],
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{
"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
fields => [
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}
},
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
#{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}},
{
"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
"field_a=field7a,field_b=\"field7b\\\\\n\"",
#{
measurement => " m7 ",
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
timestamp => undefined
}
},
{
"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
"field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
#{
measurement => "m8",
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
timestamp => "${timestamp8}"
}
},
{"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
#{
measurement => "m\\9",
tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
timestamp => "${timestamp9}"
}},
{"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
measurement => "m,10",
tags => [],
%% backslash should not be un-escaped in tag key
fields => [{"\"field\\\\\"", ""}],
timestamp => "${timestamp10}"
}}
]).
-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [
{" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1} ", #{
measurement => "m =1,",
tags => [{",tag =", "=tag 1,"}],
fields => [{",fie ld ", " field,1"}],
timestamp => "${timestamp1}"
}},
{" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{
measurement => "m2",
tags => [{"tag", "tag2"}],
fields => [{"field", "field \"2\",\n"}],
timestamp => undefined
}},
{" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{
measurement => "m 3",
tags => [],
fields => [{"field", "field3"}],
timestamp => "${payload.timestamp 3}"
}},
{" m4 field=\"\\\"field\\\\4\\\"\" ", #{
measurement => "m4",
tags => [],
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{
" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
fields => [
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}
},
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ",
#{
measurement => "m6",
tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}}
]).
invalid_write_syntax_line_test_() ->
[?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES].
invalid_write_syntax_multiline_test_() ->
LinesList = [
join("\n", ?INVALID_LINES),
join("\n\n\n", ?INVALID_LINES),
join("\n\n", lists:reverse(?INVALID_LINES))
],
[?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList].
valid_write_syntax_test_() ->
test_pairs(?VALID_LINE_PARSED_PAIRS).
valid_write_syntax_with_extra_spaces_test_() ->
test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS).
valid_write_syntax_escaped_chars_test_() ->
test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS).
valid_write_syntax_escaped_chars_with_extra_spaces_test_() ->
test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS).
test_pairs(PairsList) ->
{Lines, AllExpected} = lists:unzip(PairsList),
JoinedLines = join("\n", Lines),
JoinedLines1 = join("\n\n\n", Lines),
JoinedLines2 = join("\n\n", lists:reverse(Lines)),
SingleLineTests =
[
?_assertEqual([Expected], to_influx_lines(Line))
|| {Line, Expected} <- PairsList
],
JoinedLinesTests =
[
?_assertEqual(AllExpected, to_influx_lines(JoinedLines)),
?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)),
?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2))
],
SingleLineTests ++ JoinedLinesTests.
join(Sep, LinesList) ->
lists:flatten(lists:join(Sep, LinesList)).
to_influx_lines(RawLines) ->
OldLevel = emqx_logger:get_primary_log_level(),
try
%% mute error logs from this call
emqx_logger:set_primary_log_level(none),
emqx_bridge_greptimedb:to_influx_lines(RawLines)
after
emqx_logger:set_primary_log_level(OldLevel)
end.

View File

@ -0,0 +1 @@
Fixed an issue which would yield false negatives when testing the connectivity of GreptimeDB bridges.

View File

@ -84,6 +84,7 @@ is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false;
is_community_umbrella_app("apps/emqx_bridge_opents") -> false;
is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false;
is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false;
is_community_umbrella_app("apps/emqx_bridge_greptimedb") -> false;
is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false;
is_community_umbrella_app("apps/emqx_bridge_influxdb") -> false;
is_community_umbrella_app("apps/emqx_bridge_iotdb") -> false;