diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index fa7e0d4af..c3e3d34e2 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl index 877e464dd..d63103e2e 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl @@ -21,11 +21,6 @@ desc/1 ]). --type write_syntax() :: list(). --reflect_type([write_syntax/0]). --typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). --export([to_influx_lines/1]). - %% ------------------------------------------------------------------------------------------------- %% api @@ -131,169 +126,16 @@ desc(_) -> undefined. write_syntax(type) -> - ?MODULE:write_syntax(); + emqx_bridge_influxdb:write_syntax(); write_syntax(required) -> true; write_syntax(validator) -> [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; write_syntax(converter) -> - fun to_influx_lines/1; + fun emqx_bridge_influxdb:to_influx_lines/1; write_syntax(desc) -> ?DESC("write_syntax"); write_syntax(format) -> <<"sql">>; write_syntax(_) -> undefined. - -to_influx_lines(RawLines) -> - try - influx_lines(str(RawLines), []) - catch - _:Reason:Stacktrace -> - Msg = lists:flatten( - io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines]) - ), - ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}), - throw(Msg) - end. - --define(MEASUREMENT_ESC_CHARS, [$,, $\s]). --define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]). --define(FIELD_VAL_ESC_CHARS, [$", $\\]). -% Common separator for both tags and fields --define(SEP, $\s). --define(MEASUREMENT_TAG_SEP, $,). --define(KEY_SEP, $=). --define(VAL_SEP, $,). --define(NON_EMPTY, [_ | _]). - -influx_lines([] = _RawLines, Acc) -> - ?NON_EMPTY = lists:reverse(Acc); -influx_lines(RawLines, Acc) -> - {Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc), - influx_lines(RawLines1, Acc1). - -influx_line([], Acc) -> - {Acc, []}; -influx_line(Line, Acc) -> - {?NON_EMPTY = Measurement, Line1} = measurement(Line), - {Tags, Line2} = tags(Line1), - {?NON_EMPTY = Fields, Line3} = influx_fields(Line2), - {Timestamp, Line4} = timestamp(Line3), - { - [ - #{ - measurement => Measurement, - tags => Tags, - fields => Fields, - timestamp => Timestamp - } - | Acc - ], - Line4 - }. - -measurement(Line) -> - unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []). - -tags([?MEASUREMENT_TAG_SEP | Line]) -> - tags1(Line, []); -tags(Line) -> - {[], Line}. - -%% Empty line is invalid as fields are required after tags, -%% need to break recursion here and fail later on parsing fields -tags1([] = Line, Acc) -> - {lists:reverse(Acc), Line}; -%% Matching non empty Acc treats lines like "m, field=field_val" invalid -tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) -> - {lists:reverse(Acc), Line}; -tags1(Line, Acc) -> - {Tag, Line1} = tag(Line), - tags1(Line1, [Tag | Acc]). - -tag(Line) -> - {?NON_EMPTY = Key, Line1} = key(Line), - {?NON_EMPTY = Val, Line2} = tag_val(Line1), - {{Key, Val}, Line2}. - -tag_val(Line) -> - {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []), - {Val, strip_l(Line1, ?VAL_SEP)}. - -influx_fields([?SEP | Line]) -> - fields1(string:trim(Line, leading, "\s"), []). - -%% Timestamp is optional, so fields may be at the very end of the line -fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n -> - {lists:reverse(Acc), Line}; -fields1([] = Line, Acc) -> - {lists:reverse(Acc), Line}; -fields1(Line, Acc) -> - {Field, Line1} = field(Line), - fields1(Line1, [Field | Acc]). - -field(Line) -> - {?NON_EMPTY = Key, Line1} = key(Line), - {Val, Line2} = field_val(Line1), - {{Key, Val}, Line2}. - -field_val([$" | Line]) -> - {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []), - %% Quoted val can be empty - {Val, strip_l(Line1, ?VAL_SEP)}; -field_val(Line) -> - %% Unquoted value should not be un-escaped according to Greptimedb protocol, - %% as it can only hold float, integer, uinteger or boolean value. - %% However, as templates are possible, un-escaping is applied here, - %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}" - {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []), - {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}. - -timestamp([?SEP | Line]) -> - Line1 = string:trim(Line, leading, "\s"), - %% Similarly to unquoted field value, un-escape a timestamp to validate and handle - %% potentially escaped characters in a template - {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []), - {timestamp1(T), Line2}; -timestamp(Line) -> - {undefined, Line}. - -timestamp1(?NON_EMPTY = Ts) -> Ts; -timestamp1(_Ts) -> undefined. - -%% Common for both tag and field keys -key(Line) -> - {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []), - {Key, strip_l(Line1, ?KEY_SEP)}. - -%% Only strip a character between pairs, don't strip it(and let it fail) -%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val -strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP -> - [Ch1 | Str]; -strip_l(Str, _Ch) -> - Str. - -unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) -> - ShouldEscapeBackslash = lists:member($\\, EscapeChars), - Acc1 = - case lists:member(Char, EscapeChars) of - true -> [Char | Acc]; - false when not ShouldEscapeBackslash -> [Char, $\\ | Acc] - end, - unescape(EscapeChars, SepChars, T, Acc1); -unescape(EscapeChars, SepChars, [Char | T] = L, Acc) -> - IsEscapeChar = lists:member(Char, EscapeChars), - case lists:member(Char, SepChars) of - true -> {lists:reverse(Acc), L}; - false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc]) - end; -unescape(_EscapeChars, _SepChars, [] = L, Acc) -> - {lists:reverse(Acc), L}. - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 89fad78d2..d474db58c 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -68,7 +68,9 @@ on_start(InstId, Config) -> on_stop(InstId, _State) -> case emqx_resource:get_allocated_resources(InstId) of #{?greptime_client := Client} -> - greptimedb:stop_client(Client); + Res = greptimedb:stop_client(Client), + ?tp(greptimedb_client_stopped, #{instance_id => InstId}), + Res; _ -> ok end. diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index d4bc5b01e..15133a1ad 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -112,6 +112,9 @@ init_per_group(GreptimedbType, Config0) when {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {proxy_name, ProxyName}, + {bridge_type, greptimedb}, + {bridge_name, Name}, + {bridge_config, GreptimedbConfig}, {greptimedb_host, GreptimedbHost}, {greptimedb_port, GreptimedbPort}, {greptimedb_http_port, GreptimedbHttpPort}, @@ -457,6 +460,97 @@ t_start_ok(Config) -> ), ok. +t_start_stop(Config) -> + %% we can't use this test case directly because `greptimedb_worker' apparently leaks + %% atoms... + %% ok = emqx_bridge_testlib:t_start_stop(Config, greptimedb_client_stopped), + BridgeType = ?config(bridge_type, Config), + BridgeName = ?config(bridge_name, Config), + BridgeConfig = ?config(bridge_config, Config), + StopTracePoint = greptimedb_client_stopped, + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + ?check_trace( + begin + ProbeRes0 = emqx_bridge_testlib:probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% `start` bridge to trigger `already_started` + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Disable the bridge, which will also stop it. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge:disable_enable(disable, BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ok + end, + fun(Trace) -> + %% one for probe, two for real + ?assertMatch( + [_, #{instance_id := ResourceId}, #{instance_id := ResourceId}], + ?of_kind(StopTracePoint, Trace) + ), + ok + end + ), + ok. + t_start_already_started(Config) -> Type = greptimedb_type_bin(?config(greptimedb_type, Config)), Name = ?config(greptimedb_name, Config), diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl deleted file mode 100644 index a07ccd92d..000000000 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl +++ /dev/null @@ -1,348 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_bridge_greptimedb_tests). - --include_lib("eunit/include/eunit.hrl"). - --define(INVALID_LINES, [ - " ", - " \n", - " \n\n\n ", - "\n", - " \n\n \n \n", - "measurement", - "measurement ", - "measurement,tag", - "measurement field", - "measurement,tag field", - "measurement,tag field ${timestamp}", - "measurement,tag=", - "measurement,tag=tag1", - "measurement,tag =", - "measurement field=", - "measurement field= ", - "measurement field = ", - "measurement, tag = field = ", - "measurement, tag = field = ", - "measurement, tag = tag_val field = field_val", - "measurement, tag = tag_val field = field_val ${timestamp}", - "measurement,= = ${timestamp}", - "measurement,t=a, f=a, ${timestamp}", - "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}", - "measurement,t=a,t1=b, f=a,f1=b,", - "measurement,t=a, t1=b, f=a,f1=b,", - "measurement,t=a,,t1=b, f=a,f1=b,", - "measurement,t=a,,t1=b f=a,,f1=b", - "measurement,t=a,,t1=b f=a,f1=b ${timestamp}", - "measurement, f=a,f1=b", - "measurement, f=a,f1=b ${timestamp}", - "measurement,, f=a,f1=b ${timestamp}", - "measurement,, f=a,f1=b", - "measurement,, f=a,f1=b,, ${timestamp}", - "measurement f=a,f1=b,, ${timestamp}", - "measurement,t=a f=a,f1=b,, ${timestamp}", - "measurement,t=a f=a,f1=b,, ", - "measurement,t=a f=a,f1=b,,", - "measurement, t=a f=a,f1=b", - "measurement,t=a f=a, f1=b", - "measurement,t=a f=a, f1=b ${timestamp}", - "measurement, t=a f=a, f1=b ${timestamp}", - "measurement,t= a f=a,f1=b ${timestamp}", - "measurement,t= a f=a,f1 =b ${timestamp}", - "measurement, t = a f = a,f1 = b ${timestamp}", - "measurement,t=a f=a,f1=b \n ${timestamp}", - "measurement,t=a \n f=a,f1=b \n ${timestamp}", - "measurement,t=a \n f=a,f1=b \n ", - "\n measurement,t=a \n f=a,f1=b \n ${timestamp}", - "\n measurement,t=a \n f=a,f1=b \n", - %% not escaped backslash in a quoted field value is invalid - "measurement,tag=1 field=\"val\\1\"" -]). - --define(VALID_LINE_PARSED_PAIRS, [ - {"m1,tag=tag1 field=field1 ${timestamp1}", #{ - measurement => "m1", - tags => [{"tag", "tag1"}], - fields => [{"field", "field1"}], - timestamp => "${timestamp1}" - }}, - {"m2,tag=tag2 field=field2", #{ - measurement => "m2", - tags => [{"tag", "tag2"}], - fields => [{"field", "field2"}], - timestamp => undefined - }}, - {"m3 field=field3 ${timestamp3}", #{ - measurement => "m3", - tags => [], - fields => [{"field", "field3"}], - timestamp => "${timestamp3}" - }}, - {"m4 field=field4", #{ - measurement => "m4", - tags => [], - fields => [{"field", "field4"}], - timestamp => undefined - }}, - {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}", - #{ - measurement => "m5", - tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], - fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], - timestamp => "${timestamp5}" - }}, - {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{ - measurement => "m6", - tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], - timestamp => undefined - }}, - {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"", - #{ - measurement => "m7", - tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}], - timestamp => undefined - }}, - {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}", - #{ - measurement => "m8", - tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}], - timestamp => "${timestamp8}" - }}, - {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", - #{ - measurement => "m9", - tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], - timestamp => "${timestamp9}" - }}, - {"m10 field=\"\" ${timestamp10}", #{ - measurement => "m10", - tags => [], - fields => [{"field", ""}], - timestamp => "${timestamp10}" - }} -]). - --define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [ - {"\n m1,tag=tag1 field=field1 ${timestamp1} \n", #{ - measurement => "m1", - tags => [{"tag", "tag1"}], - fields => [{"field", "field1"}], - timestamp => "${timestamp1}" - }}, - {" m2,tag=tag2 field=field2 ", #{ - measurement => "m2", - tags => [{"tag", "tag2"}], - fields => [{"field", "field2"}], - timestamp => undefined - }}, - {" m3 field=field3 ${timestamp3} ", #{ - measurement => "m3", - tags => [], - fields => [{"field", "field3"}], - timestamp => "${timestamp3}" - }}, - {" \n m4 field=field4\n ", #{ - measurement => "m4", - tags => [], - fields => [{"field", "field4"}], - timestamp => undefined - }}, - {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5} \n", - #{ - measurement => "m5", - tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], - fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], - timestamp => "${timestamp5}" - }}, - {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b\n ", #{ - measurement => "m6", - tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], - timestamp => undefined - }} -]). - --define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [ - {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{ - measurement => "m =1,", - tags => [{",tag =", "=tag 1,"}], - fields => [{",fie ld ", " field,1"}], - timestamp => "${timestamp1}" - }}, - {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{ - measurement => "m2", - tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], - timestamp => undefined - }}, - {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{ - measurement => "m 3", - tags => [], - fields => [{"field", "field3"}], - timestamp => "${payload.timestamp 3}" - }}, - {"m4 field=\"\\\"field\\\\4\\\"\"", #{ - measurement => "m4", - tags => [], - fields => [{"field", "\"field\\4\""}], - timestamp => undefined - }}, - { - "m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," - "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}", - #{ - measurement => "m5,mA", - tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], - fields => [ - {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} - ], - timestamp => "${timestamp5}" - } - }, - {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"", - #{ - measurement => "m6", - tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], - timestamp => undefined - }}, - { - "\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\"," - "field_a=field7a,field_b=\"field7b\\\\\n\"", - #{ - measurement => " m7 ", - tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}], - fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}], - timestamp => undefined - } - }, - { - "m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a," - "field_b=\"\\\"field\\\" = 8b\" ${timestamp8}", - #{ - measurement => "m8", - tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], - fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}], - timestamp => "${timestamp8}" - } - }, - {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", - #{ - measurement => "m\\9", - tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], - fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], - timestamp => "${timestamp9}" - }}, - {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{ - measurement => "m,10", - tags => [], - %% backslash should not be un-escaped in tag key - fields => [{"\"field\\\\\"", ""}], - timestamp => "${timestamp10}" - }} -]). - --define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [ - {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1} ", #{ - measurement => "m =1,", - tags => [{",tag =", "=tag 1,"}], - fields => [{",fie ld ", " field,1"}], - timestamp => "${timestamp1}" - }}, - {" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{ - measurement => "m2", - tags => [{"tag", "tag2"}], - fields => [{"field", "field \"2\",\n"}], - timestamp => undefined - }}, - {" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{ - measurement => "m 3", - tags => [], - fields => [{"field", "field3"}], - timestamp => "${payload.timestamp 3}" - }}, - {" m4 field=\"\\\"field\\\\4\\\"\" ", #{ - measurement => "m4", - tags => [], - fields => [{"field", "\"field\\4\""}], - timestamp => undefined - }}, - { - " m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," - "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ", - #{ - measurement => "m5,mA", - tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], - fields => [ - {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} - ], - timestamp => "${timestamp5}" - } - }, - {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ", - #{ - measurement => "m6", - tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], - fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], - timestamp => undefined - }} -]). - -invalid_write_syntax_line_test_() -> - [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES]. - -invalid_write_syntax_multiline_test_() -> - LinesList = [ - join("\n", ?INVALID_LINES), - join("\n\n\n", ?INVALID_LINES), - join("\n\n", lists:reverse(?INVALID_LINES)) - ], - [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList]. - -valid_write_syntax_test_() -> - test_pairs(?VALID_LINE_PARSED_PAIRS). - -valid_write_syntax_with_extra_spaces_test_() -> - test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS). - -valid_write_syntax_escaped_chars_test_() -> - test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS). - -valid_write_syntax_escaped_chars_with_extra_spaces_test_() -> - test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS). - -test_pairs(PairsList) -> - {Lines, AllExpected} = lists:unzip(PairsList), - JoinedLines = join("\n", Lines), - JoinedLines1 = join("\n\n\n", Lines), - JoinedLines2 = join("\n\n", lists:reverse(Lines)), - SingleLineTests = - [ - ?_assertEqual([Expected], to_influx_lines(Line)) - || {Line, Expected} <- PairsList - ], - JoinedLinesTests = - [ - ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)), - ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)), - ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2)) - ], - SingleLineTests ++ JoinedLinesTests. - -join(Sep, LinesList) -> - lists:flatten(lists:join(Sep, LinesList)). - -to_influx_lines(RawLines) -> - OldLevel = emqx_logger:get_primary_log_level(), - try - %% mute error logs from this call - emqx_logger:set_primary_log_level(none), - emqx_bridge_greptimedb:to_influx_lines(RawLines) - after - emqx_logger:set_primary_log_level(OldLevel) - end. diff --git a/changes/ee/fix-11492.en.md b/changes/ee/fix-11492.en.md new file mode 100644 index 000000000..9f61abee2 --- /dev/null +++ b/changes/ee/fix-11492.en.md @@ -0,0 +1 @@ +Fixed an issue which would yield false negatives when testing the connectivity of GreptimeDB bridges. diff --git a/rebar.config.erl b/rebar.config.erl index ad6f425a0..02e946e15 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -84,6 +84,7 @@ is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false; is_community_umbrella_app("apps/emqx_bridge_opents") -> false; is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false; is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false; +is_community_umbrella_app("apps/emqx_bridge_greptimedb") -> false; is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false; is_community_umbrella_app("apps/emqx_bridge_influxdb") -> false; is_community_umbrella_app("apps/emqx_bridge_iotdb") -> false;