Merge pull request #9571 from olcai/improve-mysql-bridge-test
fix: infinite recursion in mysql connector and improve mysql EE bridge tests
This commit is contained in:
commit
c678770532
|
@ -149,8 +149,12 @@ on_query(
|
||||||
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
||||||
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
|
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
|
||||||
{error, not_prepared} ->
|
{error, not_prepared} ->
|
||||||
case prepare_sql(Prepares, PoolName) of
|
case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of
|
||||||
ok ->
|
ok ->
|
||||||
|
?tp(
|
||||||
|
mysql_connector_on_query_prepared_sql,
|
||||||
|
#{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
|
||||||
|
),
|
||||||
%% not return result, next loop will try again
|
%% not return result, next loop will try again
|
||||||
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
|
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -182,7 +186,7 @@ on_batch_query(
|
||||||
Request ->
|
Request ->
|
||||||
LogMeta = #{connector => InstId, first_request => Request, state => State},
|
LogMeta = #{connector => InstId, first_request => Request, state => State},
|
||||||
?SLOG(error, LogMeta#{msg => "invalid request"}),
|
?SLOG(error, LogMeta#{msg => "invalid request"}),
|
||||||
{error, invald_request}
|
{error, invalid_request}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mysql_function(sql) ->
|
mysql_function(sql) ->
|
||||||
|
@ -256,6 +260,12 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
|
||||||
|
case maps:is_key(SQLOrKey, Prepares) of
|
||||||
|
true -> prepare_sql(Prepares, PoolName);
|
||||||
|
false -> {error, prepared_statement_invalid}
|
||||||
|
end.
|
||||||
|
|
||||||
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
|
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
|
||||||
prepare_sql(maps:to_list(Prepares), PoolName);
|
prepare_sql(maps:to_list(Prepares), PoolName);
|
||||||
prepare_sql(Prepares, PoolName) ->
|
prepare_sql(Prepares, PoolName) ->
|
||||||
|
@ -305,6 +315,8 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
|
||||||
?SLOG(info, LogMeta#{result => success}),
|
?SLOG(info, LogMeta#{result => success}),
|
||||||
prepare_sql_to_conn(Conn, PrepareList);
|
prepare_sql_to_conn(Conn, PrepareList);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
% FIXME: we should try to differ on transient failers and
|
||||||
|
% syntax failures. Retrying syntax failures is not very productive.
|
||||||
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
|
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -407,7 +419,7 @@ on_sql_query(
|
||||||
{ok, Conn} = ecpool_worker:client(Worker),
|
{ok, Conn} = ecpool_worker:client(Worker),
|
||||||
?tp(
|
?tp(
|
||||||
mysql_connector_send_query,
|
mysql_connector_send_query,
|
||||||
#{sql_or_key => SQLOrKey, data => Data}
|
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
|
||||||
),
|
),
|
||||||
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
|
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
|
||||||
{error, disconnected} = Result ->
|
{error, disconnected} = Result ->
|
||||||
|
@ -419,6 +431,10 @@ on_sql_query(
|
||||||
_ = exit(Conn, restart),
|
_ = exit(Conn, restart),
|
||||||
Result;
|
Result;
|
||||||
{error, not_prepared} = Error ->
|
{error, not_prepared} = Error ->
|
||||||
|
?tp(
|
||||||
|
mysql_connector_prepare_query_failed,
|
||||||
|
#{error => not_prepared}
|
||||||
|
),
|
||||||
?SLOG(
|
?SLOG(
|
||||||
warning,
|
warning,
|
||||||
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
||||||
|
|
|
@ -20,3 +20,5 @@
|
||||||
|
|
||||||
- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578).
|
- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578).
|
||||||
Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing.
|
Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing.
|
||||||
|
|
||||||
|
- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571).
|
||||||
|
|
|
@ -20,3 +20,5 @@
|
||||||
|
|
||||||
- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。
|
- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。
|
||||||
在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。
|
在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。
|
||||||
|
|
||||||
|
- 增强 mysql 查询流程,确保给定的查询语句在 mysql 连接器的预编译语句中 [#9571](https://github.com/emqx/emqx/pull/9571)。
|
||||||
|
|
|
@ -35,59 +35,58 @@
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
{group, with_batch},
|
{group, tcp},
|
||||||
{group, without_batch}
|
{group, tls}
|
||||||
].
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
|
||||||
[
|
[
|
||||||
{with_batch, [
|
{tcp, [
|
||||||
{group, sync_query}
|
{group, with_batch},
|
||||||
|
{group, without_batch}
|
||||||
]},
|
]},
|
||||||
{without_batch, [
|
{tls, [
|
||||||
{group, sync_query}
|
{group, with_batch},
|
||||||
|
{group, without_batch}
|
||||||
]},
|
]},
|
||||||
{sync_query, [
|
{with_batch, TCs -- NonBatchCases},
|
||||||
{group, tcp},
|
{without_batch, TCs}
|
||||||
{group, tls}
|
|
||||||
]},
|
|
||||||
{tcp, TCs},
|
|
||||||
{tls, TCs}
|
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_group(tcp, Config0) ->
|
init_per_group(tcp, Config) ->
|
||||||
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
|
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
|
||||||
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
|
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
|
||||||
Config = [
|
[
|
||||||
{mysql_host, MysqlHost},
|
{mysql_host, MysqlHost},
|
||||||
{mysql_port, MysqlPort},
|
{mysql_port, MysqlPort},
|
||||||
{enable_tls, false},
|
{enable_tls, false},
|
||||||
|
{query_mode, sync},
|
||||||
{proxy_name, "mysql_tcp"}
|
{proxy_name, "mysql_tcp"}
|
||||||
| Config0
|
| Config
|
||||||
],
|
];
|
||||||
common_init(Config);
|
init_per_group(tls, Config) ->
|
||||||
init_per_group(tls, Config0) ->
|
|
||||||
MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"),
|
MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"),
|
||||||
MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")),
|
MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")),
|
||||||
Config = [
|
[
|
||||||
{mysql_host, MysqlHost},
|
{mysql_host, MysqlHost},
|
||||||
{mysql_port, MysqlPort},
|
{mysql_port, MysqlPort},
|
||||||
{enable_tls, true},
|
{enable_tls, true},
|
||||||
|
{query_mode, sync},
|
||||||
{proxy_name, "mysql_tls"}
|
{proxy_name, "mysql_tls"}
|
||||||
| Config0
|
| Config
|
||||||
],
|
];
|
||||||
|
init_per_group(with_batch, Config0) ->
|
||||||
|
Config = [{enable_batch, true} | Config0],
|
||||||
|
common_init(Config);
|
||||||
|
init_per_group(without_batch, Config0) ->
|
||||||
|
Config = [{enable_batch, false} | Config0],
|
||||||
common_init(Config);
|
common_init(Config);
|
||||||
init_per_group(sync_query, Config) ->
|
|
||||||
[{query_mode, sync} | Config];
|
|
||||||
init_per_group(with_batch, Config) ->
|
|
||||||
[{enable_batch, true} | Config];
|
|
||||||
init_per_group(without_batch, Config) ->
|
|
||||||
[{enable_batch, false} | Config];
|
|
||||||
init_per_group(_Group, Config) ->
|
init_per_group(_Group, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_group(Group, Config) when Group =:= tcp; Group =:= tls ->
|
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
|
||||||
connect_and_drop_table(Config),
|
connect_and_drop_table(Config),
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
@ -224,6 +223,25 @@ send_message(Config, Payload) ->
|
||||||
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||||
emqx_bridge:send_message(BridgeID, Payload).
|
emqx_bridge:send_message(BridgeID, Payload).
|
||||||
|
|
||||||
|
query_resource(Config, Request) ->
|
||||||
|
Name = ?config(mysql_name, Config),
|
||||||
|
BridgeType = ?config(mysql_bridge_type, Config),
|
||||||
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||||
|
emqx_resource:query(ResourceID, Request).
|
||||||
|
|
||||||
|
unprepare(Config, Key) ->
|
||||||
|
Name = ?config(mysql_name, Config),
|
||||||
|
BridgeType = ?config(mysql_bridge_type, Config),
|
||||||
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||||
|
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
|
||||||
|
[
|
||||||
|
begin
|
||||||
|
{ok, Conn} = ecpool_worker:client(Worker),
|
||||||
|
ok = mysql:unprepare(Conn, Key)
|
||||||
|
end
|
||||||
|
|| {_Name, Worker} <- ecpool:workers(PoolName)
|
||||||
|
].
|
||||||
|
|
||||||
% We need to create and drop the test table outside of using bridges
|
% We need to create and drop the test table outside of using bridges
|
||||||
% since a bridge expects the table to exist when enabling it. We
|
% since a bridge expects the table to exist when enabling it. We
|
||||||
% therefore call the mysql module directly, in addition to using it
|
% therefore call the mysql module directly, in addition to using it
|
||||||
|
@ -392,3 +410,111 @@ t_write_failure(Config) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
% This test doesn't work with batch enabled since it is not possible
|
||||||
|
% to set the timeout directly for batch queries
|
||||||
|
t_write_timeout(Config) ->
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
{ok, _} = create_bridge(Config),
|
||||||
|
Val = integer_to_binary(erlang:unique_integer()),
|
||||||
|
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||||
|
Timeout = 10,
|
||||||
|
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
?assertMatch(
|
||||||
|
{error, {resource_error, _}},
|
||||||
|
query_resource(Config, {send_message, SentData, [], Timeout})
|
||||||
|
)
|
||||||
|
end),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_simple_sql_query(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
Request = {sql, <<"SELECT count(1) AS T">>},
|
||||||
|
Result = query_resource(Config, Request),
|
||||||
|
case ?config(enable_batch, Config) of
|
||||||
|
true -> ?assertEqual({error, batch_select_not_implemented}, Result);
|
||||||
|
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_missing_data(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
Result = send_message(Config, #{}),
|
||||||
|
case ?config(enable_batch, Config) of
|
||||||
|
true ->
|
||||||
|
?assertMatch(
|
||||||
|
{error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
|
||||||
|
);
|
||||||
|
false ->
|
||||||
|
?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_bad_sql_parameter(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
Request = {sql, <<"">>, [bad_parameter]},
|
||||||
|
Result = query_resource(Config, Request),
|
||||||
|
case ?config(enable_batch, Config) of
|
||||||
|
true -> ?assertEqual({error, invalid_request}, Result);
|
||||||
|
false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_unprepared_statement_query(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
Request = {prepared_query, unprepared_query, []},
|
||||||
|
Result = query_resource(Config, Request),
|
||||||
|
case ?config(enable_batch, Config) of
|
||||||
|
true -> ?assertEqual({error, invalid_request}, Result);
|
||||||
|
false -> ?assertEqual({error, prepared_statement_invalid}, Result)
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Test doesn't work with batch enabled since batch doesn't use
|
||||||
|
%% prepared statements as such; it has its own query generation process
|
||||||
|
t_uninitialized_prepared_statement(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
Val = integer_to_binary(erlang:unique_integer()),
|
||||||
|
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||||
|
unprepare(Config, send_message),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assert(
|
||||||
|
?strict_causality(
|
||||||
|
#{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
|
||||||
|
#{
|
||||||
|
?snk_kind := mysql_connector_on_query_prepared_sql,
|
||||||
|
type_or_key := send_message
|
||||||
|
},
|
||||||
|
Trace
|
||||||
|
)
|
||||||
|
),
|
||||||
|
SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace),
|
||||||
|
?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace),
|
||||||
|
ReturnTrace = ?of_kind(mysql_connector_query_return, Trace),
|
||||||
|
?assertMatch([#{result := ok}], ReturnTrace),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue