diff --git a/.ci/docker-compose-file/docker-compose-mysql-tcp.yaml b/.ci/docker-compose-file/docker-compose-mysql-tcp.yaml index 0c7e45f01..4578ff94f 100644 --- a/.ci/docker-compose-file/docker-compose-mysql-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mysql-tcp.yaml @@ -18,4 +18,7 @@ services: - --collation-server=utf8mb4_general_ci - --lower-case-table-names=1 - --max-allowed-packet=128M + # Severely limit maximum number of prepared statements the server must permit + # so that we hit potential resource exhaustion earlier in tests. + - --max-prepared-stmt-count=64 - --skip-symbolic-links diff --git a/.ci/docker-compose-file/docker-compose-mysql-tls.yaml b/.ci/docker-compose-file/docker-compose-mysql-tls.yaml index 67dbec808..83fd4658c 100644 --- a/.ci/docker-compose-file/docker-compose-mysql-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-mysql-tls.yaml @@ -25,6 +25,9 @@ services: - --collation-server=utf8mb4_general_ci - --lower-case-table-names=1 - --max-allowed-packet=128M + # Severely limit maximum number of prepared statements the server must permit + # so that we hit potential resource exhaustion earlier in tests. + - --max-prepared-stmt-count=64 - --ssl-ca=/etc/certs/ca-cert.pem - --ssl-cert=/etc/certs/server-cert.pem - --ssl-key=/etc/certs/server-key.pem diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index e06d6a9d7..ce669c33f 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -392,13 +392,13 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens), - on_sql_query(InstId, query, SQL, [], default_timeout, State). + on_sql_query(InstId, query, SQL, no_params, default_timeout, State). on_sql_query( InstId, SQLFunc, SQLOrKey, - Data, + Params, Timeout, #{poolname := PoolName} = State ) -> @@ -409,9 +409,9 @@ on_sql_query( {ok, Conn} -> ?tp( mysql_connector_send_query, - #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} + #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Params} ), - do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta); + do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta); {error, disconnected} -> ?SLOG( error, @@ -423,8 +423,8 @@ on_sql_query( {error, {recoverable_error, disconnected}} end. -do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> - try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of +do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta) -> + try mysql:SQLFunc(Conn, SQLOrKey, Params, no_filtermap_fun, Timeout) of {error, disconnected} -> ?SLOG( error, @@ -466,7 +466,7 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> error:badarg -> ?SLOG( error, - LogMeta#{msg => "mysql_connector_invalid_params", params => Data} + LogMeta#{msg => "mysql_connector_invalid_params", params => Params} ), - {error, {unrecoverable_error, {invalid_params, Data}}} + {error, {unrecoverable_error, {invalid_params, Params}}} end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index fec85c874..2d3c52312 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -28,6 +28,9 @@ -define(MYSQL_DATABASE, "mqtt"). -define(MYSQL_USERNAME, "root"). -define(MYSQL_PASSWORD, "public"). +-define(MYSQL_POOL_SIZE, 4). + +-define(WORKER_POOL_SIZE, 4). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -168,11 +171,13 @@ mysql_config(BridgeType, Config) -> " database = ~p\n" " username = ~p\n" " password = ~p\n" + " pool_size = ~b\n" " sql = ~p\n" " resource_opts = {\n" " request_timeout = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" + " worker_pool_size = ~b\n" " }\n" " ssl = {\n" " enable = ~w\n" @@ -185,9 +190,11 @@ mysql_config(BridgeType, Config) -> ?MYSQL_DATABASE, ?MYSQL_USERNAME, ?MYSQL_PASSWORD, + ?MYSQL_POOL_SIZE, ?SQL_BRIDGE, BatchSize, QueryMode, + ?WORKER_POOL_SIZE, TlsEnabled ] ), @@ -265,27 +272,26 @@ connect_direct_mysql(Config) -> {ok, Pid} = mysql:start_link(Opts ++ SslOpts), Pid. +query_direct_mysql(Config, Query) -> + Pid = connect_direct_mysql(Config), + try + mysql:query(Pid, Query) + after + mysql:stop(Pid) + end. + % These funs connect and then stop the mysql connection connect_and_create_table(Config) -> - DirectPid = connect_direct_mysql(Config), - ok = mysql:query(DirectPid, ?SQL_CREATE_TABLE), - mysql:stop(DirectPid). + query_direct_mysql(Config, ?SQL_CREATE_TABLE). connect_and_drop_table(Config) -> - DirectPid = connect_direct_mysql(Config), - ok = mysql:query(DirectPid, ?SQL_DROP_TABLE), - mysql:stop(DirectPid). + query_direct_mysql(Config, ?SQL_DROP_TABLE). connect_and_clear_table(Config) -> - DirectPid = connect_direct_mysql(Config), - ok = mysql:query(DirectPid, ?SQL_DELETE), - mysql:stop(DirectPid). + query_direct_mysql(Config, ?SQL_DELETE). connect_and_get_payload(Config) -> - DirectPid = connect_direct_mysql(Config), - Result = mysql:query(DirectPid, ?SQL_SELECT), - mysql:stop(DirectPid), - Result. + query_direct_mysql(Config, ?SQL_SELECT). %%------------------------------------------------------------------------------ %% Testcases @@ -505,6 +511,39 @@ t_bad_sql_parameter(Config) -> end, ok. +t_workload_fits_prepared_statement_limit(Config) -> + N = 50, + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Results = lists:append( + emqx_misc:pmap( + fun(_) -> + [ + begin + Payload = integer_to_binary(erlang:unique_integer()), + Timestamp = erlang:system_time(millisecond), + send_message(Config, #{payload => Payload, timestamp => Timestamp}) + end + || _ <- lists:seq(1, N) + ] + end, + lists:seq(1, ?WORKER_POOL_SIZE * ?MYSQL_POOL_SIZE), + _Timeout = 10_000 + ) + ), + ?assertEqual( + [], + [R || R <- Results, R /= ok] + ), + {ok, _, [[_Var, Count]]} = + query_direct_mysql(Config, "SHOW GLOBAL STATUS LIKE 'Prepared_stmt_count'"), + ?assertEqual( + ?MYSQL_POOL_SIZE, + binary_to_integer(Count) + ). + t_unprepared_statement_query(Config) -> ?assertMatch( {ok, _},