diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 46f28ad40..14ea202be 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -13,8 +13,13 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). % SQL definitions --define(STREAM, "stream"). + +-define(STREAM, "demo_stream"). +%% could not be "stream" in Production Environment +%% especially not in hstreamdb_sql CLI client + -define(REPLICATION_FACTOR, 1). + %% in seconds -define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)). -define(SHARD_COUNT, 1). @@ -146,16 +151,23 @@ t_setup_via_config_and_publish(Config) -> begin ?wait_async_action( ?assertEqual(ok, send_message(Config, Data)), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ), ok end, fun(Trace0) -> - Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + Trace = ?of_kind(hstreamdb_connector_query_append_return, Trace0), lists:foreach( fun(EachTrace) -> - ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end end, Trace ), @@ -181,16 +193,26 @@ t_setup_via_http_api_and_publish(Config) -> begin ?wait_async_action( ?assertEqual(ok, send_message(Config, Data)), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ), ok end, fun(Trace) -> - ?assertMatch( - [#{result := #{streamName := <>}}], - ?of_kind(hstreamdb_connector_query_return, Trace) - ) + lists:foreach( + fun(EachTrace) -> + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end + end, + ?of_kind(hstreamdb_connector_query_append_return, Trace) + ), + ok end ), ok. @@ -240,6 +262,7 @@ t_write_failure(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), Data = rand_data(), {{ok, _}, {ok, _}} = ?wait_async_action( @@ -251,10 +274,16 @@ t_write_failure(Config) -> health_check_resource_down(Config), case QueryMode of sync -> - ?assertMatch( - {error, {resource_error, #{msg := "call resource timeout", reason := timeout}}}, - send_message(Config, Data) - ); + case EnableBatch of + true -> + %% append to batch always returns ok + ?assertMatch(ok, send_message(Config, Data)); + false -> + ?assertMatch( + {error, {cannot_list_shards, {<>, econnrefused}}}, + send_message(Config, Data) + ) + end; async -> %% TODO: async mode is not supported yet, %% but it will return ok if calling emqx_resource_buffer_worker:async_query/3, @@ -282,17 +311,23 @@ t_simple_query(Config) -> end, Requests ), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ) end, - fun(Trace0) -> - Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + fun(Trace) -> lists:foreach( fun(EachTrace) -> - ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end end, - Trace + ?of_kind(hstreamdb_connector_query_append_return, Trace) ), ok end