test: add more EE mysql bridge test cases
This commit is contained in:
parent
23ac426608
commit
7df24000a0
|
@ -419,7 +419,7 @@ on_sql_query(
|
|||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
?tp(
|
||||
mysql_connector_send_query,
|
||||
#{sql_or_key => SQLOrKey, data => Data}
|
||||
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
|
||||
),
|
||||
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
|
||||
{error, disconnected} = Result ->
|
||||
|
@ -431,6 +431,10 @@ on_sql_query(
|
|||
_ = exit(Conn, restart),
|
||||
Result;
|
||||
{error, not_prepared} = Error ->
|
||||
?tp(
|
||||
mysql_connector_prepare_query_failed,
|
||||
#{error => not_prepared}
|
||||
),
|
||||
?SLOG(
|
||||
warning,
|
||||
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
||||
|
|
|
@ -35,59 +35,58 @@
|
|||
|
||||
all() ->
|
||||
[
|
||||
{group, with_batch},
|
||||
{group, without_batch}
|
||||
{group, tcp},
|
||||
{group, tls}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
|
||||
[
|
||||
{with_batch, [
|
||||
{group, sync_query}
|
||||
{tcp, [
|
||||
{group, with_batch},
|
||||
{group, without_batch}
|
||||
]},
|
||||
{without_batch, [
|
||||
{group, sync_query}
|
||||
{tls, [
|
||||
{group, with_batch},
|
||||
{group, without_batch}
|
||||
]},
|
||||
{sync_query, [
|
||||
{group, tcp},
|
||||
{group, tls}
|
||||
]},
|
||||
{tcp, TCs},
|
||||
{tls, TCs}
|
||||
{with_batch, TCs -- NonBatchCases},
|
||||
{without_batch, TCs}
|
||||
].
|
||||
|
||||
init_per_group(tcp, Config0) ->
|
||||
init_per_group(tcp, Config) ->
|
||||
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
|
||||
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
|
||||
Config = [
|
||||
[
|
||||
{mysql_host, MysqlHost},
|
||||
{mysql_port, MysqlPort},
|
||||
{enable_tls, false},
|
||||
{query_mode, sync},
|
||||
{proxy_name, "mysql_tcp"}
|
||||
| Config0
|
||||
],
|
||||
common_init(Config);
|
||||
init_per_group(tls, Config0) ->
|
||||
| Config
|
||||
];
|
||||
init_per_group(tls, Config) ->
|
||||
MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"),
|
||||
MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")),
|
||||
Config = [
|
||||
[
|
||||
{mysql_host, MysqlHost},
|
||||
{mysql_port, MysqlPort},
|
||||
{enable_tls, true},
|
||||
{query_mode, sync},
|
||||
{proxy_name, "mysql_tls"}
|
||||
| Config0
|
||||
],
|
||||
| Config
|
||||
];
|
||||
init_per_group(with_batch, Config0) ->
|
||||
Config = [{enable_batch, true} | Config0],
|
||||
common_init(Config);
|
||||
init_per_group(without_batch, Config0) ->
|
||||
Config = [{enable_batch, false} | Config0],
|
||||
common_init(Config);
|
||||
init_per_group(sync_query, Config) ->
|
||||
[{query_mode, sync} | Config];
|
||||
init_per_group(with_batch, Config) ->
|
||||
[{enable_batch, true} | Config];
|
||||
init_per_group(without_batch, Config) ->
|
||||
[{enable_batch, false} | Config];
|
||||
init_per_group(_Group, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(Group, Config) when Group =:= tcp; Group =:= tls ->
|
||||
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
|
||||
connect_and_drop_table(Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
|
@ -224,6 +223,25 @@ send_message(Config, Payload) ->
|
|||
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||
emqx_bridge:send_message(BridgeID, Payload).
|
||||
|
||||
query_resource(Config, Request) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ResourceID, Request).
|
||||
|
||||
unprepare(Config, Key) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
|
||||
[
|
||||
begin
|
||||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
ok = mysql:unprepare(Conn, Key)
|
||||
end
|
||||
|| {_Name, Worker} <- ecpool:workers(PoolName)
|
||||
].
|
||||
|
||||
% We need to create and drop the test table outside of using bridges
|
||||
% since a bridge expects the table to exist when enabling it. We
|
||||
% therefore call the mysql module directly, in addition to using it
|
||||
|
@ -392,3 +410,114 @@ t_write_failure(Config) ->
|
|||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
% This test doesn't work with batch enabled since it is not possible
|
||||
% to set the timeout directly for batch queries
|
||||
t_write_timeout(Config) ->
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||
Timeout = 10,
|
||||
?check_trace(
|
||||
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
query_resource(Config, {send_message, SentData, [], Timeout})
|
||||
end),
|
||||
fun(Result, _Trace) ->
|
||||
?assertMatch({error, {resource_error, _}}, Result),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_simple_sql_query(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Request = {sql, <<"SELECT count(1) AS T">>},
|
||||
Result = query_resource(Config, Request),
|
||||
case ?config(enable_batch, Config) of
|
||||
true -> ?assertEqual({error, batch_select_not_implemented}, Result);
|
||||
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_missing_data(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Result = send_message(Config, #{}),
|
||||
case ?config(enable_batch, Config) of
|
||||
true ->
|
||||
?assertMatch(
|
||||
{error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
|
||||
);
|
||||
false ->
|
||||
?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_bad_sql_parameter(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Request = {sql, <<"">>, [bad_parameter]},
|
||||
Result = query_resource(Config, Request),
|
||||
case ?config(enable_batch, Config) of
|
||||
true -> ?assertEqual({error, invalid_request}, Result);
|
||||
false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_unprepared_statement_query(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Request = {prepared_query, unprepared_query, []},
|
||||
Result = query_resource(Config, Request),
|
||||
case ?config(enable_batch, Config) of
|
||||
true -> ?assertEqual({error, invalid_request}, Result);
|
||||
false -> ?assertEqual({error, prepared_statement_is_unprepared}, Result)
|
||||
end,
|
||||
ok.
|
||||
|
||||
%% Test doesn't work with batch enabled since batch doesn't use
|
||||
%% prepared statements as such; it has its own query generation process
|
||||
t_uninitialized_prepared_statement(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||
unprepare(Config, send_message),
|
||||
?check_trace(
|
||||
begin
|
||||
?assertEqual(ok, send_message(Config, SentData)),
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
|
||||
#{
|
||||
?snk_kind := mysql_connector_on_query_prepared_sql,
|
||||
type_or_key := send_message
|
||||
},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace),
|
||||
?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace),
|
||||
ReturnTrace = ?of_kind(mysql_connector_query_return, Trace),
|
||||
?assertMatch([#{result := ok}], ReturnTrace),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue