test(tdengine): cover multi-table insert && subtable name generation

This commit is contained in:
firest 2023-07-13 07:39:01 +00:00
parent d450d34ab3
commit 749b8cd2ed
1 changed files with 86 additions and 65 deletions

View File

@ -13,7 +13,8 @@
% SQL definitions % SQL definitions
-define(SQL_BRIDGE, -define(SQL_BRIDGE,
"insert into mqtt.t_mqtt_msg(ts, payload) values (${timestamp}, ${payload})" "insert into t_mqtt_msg(ts, payload) values (${timestamp}, '${payload}')"
"t_mqtt_msg(ts, payload) values (${second_ts}, '${payload}')"
). ).
-define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;"). -define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;").
@ -29,7 +30,8 @@
-define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
-define(AUTO_CREATE_BRIDGE, -define(AUTO_CREATE_BRIDGE,
"insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})" "insert into ${clientid} USING s_tab TAGS ('${clientid}') values (${timestamp}, '${payload}')"
"test_${clientid} USING s_tab TAGS ('${clientid}') values (${second_ts}, '${payload}')"
). ).
-define(SQL_CREATE_STABLE, -define(SQL_CREATE_STABLE,
@ -301,7 +303,7 @@ connect_and_clear_table(Config) ->
connect_and_get_payload(Config) -> connect_and_get_payload(Config) ->
?WITH_CON( ?WITH_CON(
{ok, #{<<"code">> := 0, <<"data">> := [[Result]]}} = directly_query(Con, ?SQL_SELECT) {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, ?SQL_SELECT)
), ),
Result. Result.
@ -329,7 +331,7 @@ t_setup_via_config_and_publish(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
?check_trace( ?check_trace(
begin begin
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
@ -342,7 +344,7 @@ t_setup_via_config_and_publish(Config) ->
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
), ),
ok ok
@ -368,7 +370,8 @@ t_setup_via_http_api_and_publish(Config) ->
{ok, _}, {ok, _},
create_bridge_http(TDengineConfig) create_bridge_http(TDengineConfig)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
?check_trace( ?check_trace(
begin begin
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -386,7 +389,7 @@ t_setup_via_http_api_and_publish(Config) ->
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
), ),
ok ok
@ -426,7 +429,7 @@ t_write_failure(Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
?wait_async_action( ?wait_async_action(
@ -461,7 +464,7 @@ t_write_timeout(Config) ->
} }
} }
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
%% FIXME: TDengine connector hangs indefinetily during %% FIXME: TDengine connector hangs indefinetily during
%% `call_query' while the connection is unresponsive. Should add %% `call_query' while the connection is unresponsive. Should add
%% a timeout to `APPLY_RESOURCE' in buffer worker?? %% a timeout to `APPLY_RESOURCE' in buffer worker??
@ -486,7 +489,7 @@ t_simple_sql_query(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {query, <<"SELECT count(1) AS T">>}, Request = {query, <<"SELECT 1 AS T">>},
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
?wait_async_action( ?wait_async_action(
query_resource(Config, Request), query_resource(Config, Request),
@ -537,37 +540,41 @@ t_bad_sql_parameter(Config) ->
?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
ok. ok.
t_nasty_sql_string(Config) -> %% TODO
?assertMatch( %% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders,
{ok, _}, %% the SQL quote(escape) is removed now,
create_bridge(Config) %% we should introduce a new syntax for placeholders to allow some vars to keep unquote.
), %% t_nasty_sql_string(Config) ->
% NOTE %% ?assertMatch(
% Column `payload` has BINARY type, so we would certainly like to test it %% {ok, _},
% with `lists:seq(1, 127)`, but: %% create_bridge(Config)
% 1. There's no way to insert zero byte in an SQL string, seems that TDengine's %% ),
% parser[1] has no escaping sequence for it so a zero byte probably confuses %% % NOTE
% interpreter somewhere down the line. %% % Column `payload` has BINARY type, so we would certainly like to test it
% 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for %% % with `lists:seq(1, 127)`, but:
% some reason. %% % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's
% %% % parser[1] has no escaping sequence for it so a zero byte probably confuses
% [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 %% % interpreter somewhere down the line.
Payload = list_to_binary(lists:seq(1, 127)), %% % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, %% % some reason.
{_, {ok, #{result := Result}}} = %% %
?wait_async_action( %% % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
send_message(Config, Message), %% Payload = list_to_binary(lists:seq(1, 127)),
#{?snk_kind := buffer_worker_flush_ack}, %% Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
2_000 %% {_, {ok, #{result := Result}}} =
), %% ?wait_async_action(
?assertMatch( %% send_message(Config, Message),
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, %% #{?snk_kind := buffer_worker_flush_ack},
Result %% 2_000
), %% ),
?assertEqual( %% ?assertMatch(
Payload, %% {ok, #{<<"code">> := 0, <<"rows">> := 1}},
connect_and_get_payload(Config) %% Result
). %% ),
%% ?assertEqual(
%% Payload,
%% connect_and_get_payload(Config)
%% ).
t_simple_insert(Config) -> t_simple_insert(Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
@ -576,7 +583,7 @@ t_simple_insert(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
Request = {send_message, SentData}, Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} = {_, {ok, #{result := _Result}}} =
?wait_async_action( ?wait_async_action(
@ -585,7 +592,7 @@ t_simple_insert(Config) ->
2_000 2_000
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
). ).
@ -602,7 +609,9 @@ t_batch_insert(Config) ->
?wait_async_action( ?wait_async_action(
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx}, SentData = #{
payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000
},
Request = {send_message, SentData}, Request = {send_message, SentData},
query_resource(Config, Request) query_resource(Config, Request)
end, end,
@ -613,11 +622,12 @@ t_batch_insert(Config) ->
2_000 2_000
), ),
DoubleSize = Size * 2,
?retry( ?retry(
_Sleep = 50, _Sleep = 50,
_Attempts = 30, _Attempts = 30,
?assertMatch( ?assertMatch(
[[Size]], [[DoubleSize]],
connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
) )
). ).
@ -633,6 +643,7 @@ t_auto_create_simple_insert(Config0) ->
SentData = #{ SentData = #{
payload => ?PAYLOAD, payload => ?PAYLOAD,
timestamp => 1668602148000, timestamp => 1668602148000,
second_ts => 1668602148000 + 100,
clientid => ClientId clientid => ClientId
}, },
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -647,9 +658,19 @@ t_auto_create_simple_insert(Config0) ->
connect_and_query(Config, "SELECT payload FROM " ++ ClientId) connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
), ),
?assertMatch(
[[?PAYLOAD]],
connect_and_query(Config, "SELECT payload FROM test_" ++ ClientId)
),
?assertMatch( ?assertMatch(
[[0]], [[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId) connect_and_query(Config, "DROP TABLE " ++ ClientId)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE test_" ++ ClientId)
). ).
t_auto_create_batch_insert(Config0) -> t_auto_create_batch_insert(Config0) ->
@ -675,6 +696,7 @@ t_auto_create_batch_insert(Config0) ->
SentData = #{ SentData = #{
payload => ?PAYLOAD, payload => ?PAYLOAD,
timestamp => Ts + Idx + Offset, timestamp => Ts + Idx + Offset,
second_ts => Ts + Idx + Offset + 5000,
clientid => ClientId clientid => ClientId
}, },
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -693,29 +715,28 @@ t_auto_create_batch_insert(Config0) ->
_Sleep = 50, _Sleep = 50,
_Attempts = 30, _Attempts = 30,
lists:foreach(
fun({Table, Size}) ->
?assertMatch( ?assertMatch(
[[Size1]], [[Size]],
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) connect_and_query(Config, "SELECT COUNT(1) FROM " ++ Table)
)
end,
lists:zip(
[ClientId1, "test_" ++ ClientId1, ClientId2, "test_" ++ ClientId2],
[Size1, Size1, Size2, Size2]
)
) )
), ),
?retry( lists:foreach(
50, fun(E) ->
30,
?assertMatch( ?assertMatch(
[[Size2]], [[0]],
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) connect_and_query(Config, "DROP TABLE " ++ E)
) )
), end,
[ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2]
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId1)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId2)
). ).
to_bin(List) when is_list(List) -> to_bin(List) when is_list(List) ->