test: new batch mechanism of hstreamdb_erl

This commit is contained in:
JimMoen 2023-08-18 18:10:29 +08:00
parent 5527edf442
commit 649647190e
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
1 changed files with 53 additions and 18 deletions

View File

@ -13,8 +13,13 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
% SQL definitions
-define(STREAM, "stream").
-define(STREAM, "demo_stream").
%% could not be "stream" in Production Environment
%% especially not in hstreamdb_sql CLI client
-define(REPLICATION_FACTOR, 1).
%% in seconds
-define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)).
-define(SHARD_COUNT, 1).
@ -146,16 +151,23 @@ t_setup_via_config_and_publish(Config) ->
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, Data)),
#{?snk_kind := hstreamdb_connector_query_return},
#{?snk_kind := hstreamdb_connector_query_append_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
Trace = ?of_kind(hstreamdb_connector_query_append_return, Trace0),
lists:foreach(
fun(EachTrace) ->
?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
case ?config(enable_batch, Config) of
true ->
?assertMatch(#{result := ok, is_batch := true}, EachTrace);
false ->
?assertMatch(
#{result := #{'batchId' := _}, is_batch := false}, EachTrace
)
end
end,
Trace
),
@ -181,17 +193,27 @@ t_setup_via_http_api_and_publish(Config) ->
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, Data)),
#{?snk_kind := hstreamdb_connector_query_return},
#{?snk_kind := hstreamdb_connector_query_append_return},
10_000
),
ok
end,
fun(Trace) ->
lists:foreach(
fun(EachTrace) ->
case ?config(enable_batch, Config) of
true ->
?assertMatch(#{result := ok, is_batch := true}, EachTrace);
false ->
?assertMatch(
[#{result := #{streamName := <<?STREAM>>}}],
?of_kind(hstreamdb_connector_query_return, Trace)
#{result := #{'batchId' := _}, is_batch := false}, EachTrace
)
end
end,
?of_kind(hstreamdb_connector_query_append_return, Trace)
),
ok
end
),
ok.
@ -240,6 +262,7 @@ t_write_failure(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
Data = rand_data(),
{{ok, _}, {ok, _}} =
?wait_async_action(
@ -251,10 +274,16 @@ t_write_failure(Config) ->
health_check_resource_down(Config),
case QueryMode of
sync ->
case EnableBatch of
true ->
%% append to batch always returns ok
?assertMatch(ok, send_message(Config, Data));
false ->
?assertMatch(
{error, {resource_error, #{msg := "call resource timeout", reason := timeout}}},
{error, {cannot_list_shards, {<<?STREAM>>, econnrefused}}},
send_message(Config, Data)
);
)
end;
async ->
%% TODO: async mode is not supported yet,
%% but it will return ok if calling emqx_resource_buffer_worker:async_query/3,
@ -282,17 +311,23 @@ t_simple_query(Config) ->
end,
Requests
),
#{?snk_kind := hstreamdb_connector_query_return},
#{?snk_kind := hstreamdb_connector_query_append_return},
10_000
)
end,
fun(Trace0) ->
Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
fun(Trace) ->
lists:foreach(
fun(EachTrace) ->
?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
case ?config(enable_batch, Config) of
true ->
?assertMatch(#{result := ok, is_batch := true}, EachTrace);
false ->
?assertMatch(
#{result := #{'batchId' := _}, is_batch := false}, EachTrace
)
end
end,
Trace
?of_kind(hstreamdb_connector_query_append_return, Trace)
),
ok
end