From 49218569503e6502edd6a180a2f170626efd9304 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 20 Jul 2023 20:10:29 +0800 Subject: [PATCH] test: make all emqx_bridge_greptimedb_SUITE tests passing --- .../src/emqx_bridge_greptimedb_connector.erl | 4 +- .../test/emqx_bridge_greptimedb_SUITE.erl | 94 +++++++++++-------- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 43455d5d2..655351842 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -81,7 +81,7 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c #{batch => false, mode => sync, error => ErrorPoints} ), log_error_points(InstId, ErrorPoints), - ErrorPoints + {error, ErrorPoints} end. %% Once a Batched Data trans to points failed. @@ -463,7 +463,7 @@ parse_timestamp([TsBin]) -> continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> case line_to_point(Data, Item) of - #{fields := Fields} when map_size(Fields) =:= 0 -> + {_, [#{fields := Fields}]} when map_size(Fields) =:= 0 -> %% greptimedb client doesn't like empty field maps... ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc], lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1); diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index 044d6a2bd..b7fb6451e 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -284,7 +285,8 @@ send_message(Config, Payload) -> Name = ?config(greptimedb_name, Config), Type = greptimedb_type_bin(?config(greptimedb_type, Config)), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - emqx_bridge:send_message(BridgeId, Payload). + Resp = emqx_bridge:send_message(BridgeId, Payload), + Resp. query_by_clientid(Topic, ClientId, Config) -> GreptimedbHost = ?config(greptimedb_host, Config), @@ -308,7 +310,7 @@ query_by_clientid(Topic, ClientId, Config) -> {"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="}, {"Content-Type", "application/x-www-form-urlencoded"} ], - Body = <<"sql=select * from ", Topic/binary, " where clientid='", ClientId/binary, "'">>, + Body = <<"sql=select * from \"", Topic/binary, "\" where clientid='", ClientId/binary, "'">>, {ok, 200, _Headers, RawBody0} = ehttpc:request( EHttpcPoolName, @@ -317,29 +319,49 @@ query_by_clientid(Topic, ClientId, Config) -> _Timeout = 10_000, _Retry = 0 ), - #{ - <<"code">> := 0, - <<"output">> := [ - #{ - <<"records">> := #{ - <<"rows">> := Rows, - <<"schema">> := Schema - } - } - ] - } = emqx_utils_json:decode(RawBody0, [return_maps]), - case Schema of - null -> - #{}; - #{<<"column_schemas">> := ColumnsSchemas} -> - Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas), - index_by_field(Rows, Columns) + case emqx_utils_json:decode(RawBody0, [return_maps]) of + #{ + <<"code">> := 0, + <<"output">> := [ + #{ + <<"records">> := #{ + <<"rows">> := Rows, + <<"schema">> := Schema + } + } + ] + } -> + make_row(Schema, Rows); + #{ + <<"code">> := Code, + <<"error">> := Error + } -> + GreptimedbName = ?config(greptimedb_name, Config), + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName), + + ?SLOG(error, #{ + msg => io_lib:format("Failed to query: ~p, ~p", [Code, Error]), + connector => BridgeId, + reason => Error + }), + %% TODO(dennis): check the error by code + case binary:match(Error, <<"Table not found">>) of + nomatch -> + {error, Error}; + _ -> + %% Table not found + #{} + end end. -index_by_field([], Columns) -> +make_row(null, _Rows) -> #{}; -index_by_field([Row], Columns) -> +make_row(_Schema, []) -> + #{}; +make_row(#{<<"column_schemas">> := ColumnsSchemas}, [Row]) -> + Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas), maps:from_list(lists:zip(Columns, Row)). assert_persisted_data(ClientId, Expected, PersistedData) -> @@ -784,26 +806,22 @@ t_write_failure(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> case QueryMode of sync -> - {_, {ok, _}} = - ?wait_async_action( - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - send_message(Config, SentData) - ), - #{?snk_kind := handle_async_reply, action := nack}, - 1_000 - ) + ?wait_async_action( + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + send_message(Config, SentData) + ), + #{?snk_kind := greptimedb_connector_do_query_failure, action := nack}, + 16_000 + ) end end), - fun(Trace0) -> + fun(Trace) -> case QueryMode of sync -> - Trace = ?of_kind(handle_async_reply, Trace0), - ?assertMatch([_ | _], Trace), - [#{result := Result} | _] = Trace, - ?assert( - not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result), - #{got => Result} + ?assertMatch( + [#{error := _} | _], + ?of_kind(greptimedb_connector_do_query_failure, Trace) ) end, ok @@ -841,7 +859,7 @@ t_missing_field(Config) -> ?match_n_events(NEvents, #{ ?snk_kind := greptimedb_connector_send_query_error }), - _Timeout1 = 10_000 + _Timeout1 = 16_000 ), ok end,