From 23ac426608b658f1d73fda6f55e8d03ebf0660ec Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Tue, 13 Dec 2022 09:14:04 +0100 Subject: [PATCH 1/6] fix(emqx_connector): check for key among prepared statements on query An infinite loop was triggered in the mysql connector when a query used a prepared statement key that was not among the defined prepared statements on start. We now check that the key is defined among the prepared statements before recursing. It seems that this bug was never triggered in any production code flow and simply found while writing tests. An error return spell fix is also included as well as a FIXME comment regarding running mysql:prepare and not distinguishing between transient failures and syntax errors. Syntax errors should not be retried. --- apps/emqx_connector/src/emqx_connector_mysql.erl | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index fc3068c66..b7cfd84b6 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -149,8 +149,12 @@ on_query( {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of {error, not_prepared} -> - case prepare_sql(Prepares, PoolName) of + case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of ok -> + ?tp( + mysql_connector_on_query_prepared_sql, + #{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params} + ), %% not return result, next loop will try again on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State); {error, Reason} -> @@ -182,7 +186,7 @@ on_batch_query( Request -> LogMeta = #{connector => InstId, first_request => Request, state => State}, ?SLOG(error, LogMeta#{msg => "invalid request"}), - {error, invald_request} + {error, invalid_request} end. mysql_function(sql) -> @@ -256,6 +260,12 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> end end. +maybe_prepare_sql(SQLOrKey, Prepares, PoolName) -> + case maps:is_key(SQLOrKey, Prepares) of + true -> prepare_sql(Prepares, PoolName); + false -> {error, prepared_statement_is_unprepared} + end. + prepare_sql(Prepares, PoolName) when is_map(Prepares) -> prepare_sql(maps:to_list(Prepares), PoolName); prepare_sql(Prepares, PoolName) -> @@ -305,6 +315,8 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); {error, Reason} -> + % FIXME: we should try to differ on transient failers and + % syntax failures. Retrying syntax failures is not very productive. ?SLOG(error, LogMeta#{result => failed, reason => Reason}), {error, Reason} end. From 7df24000a054090afd8549fbd1caf995e3bff482 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Tue, 13 Dec 2022 15:25:06 +0100 Subject: [PATCH 2/6] test: add more EE mysql bridge test cases --- .../src/emqx_connector_mysql.erl | 6 +- .../test/emqx_ee_bridge_mysql_SUITE.erl | 185 +++++++++++++++--- 2 files changed, 162 insertions(+), 29 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b7cfd84b6..4e3a0ed22 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -419,7 +419,7 @@ on_sql_query( {ok, Conn} = ecpool_worker:client(Worker), ?tp( mysql_connector_send_query, - #{sql_or_key => SQLOrKey, data => Data} + #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} ), try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of {error, disconnected} = Result -> @@ -431,6 +431,10 @@ on_sql_query( _ = exit(Conn, restart), Result; {error, not_prepared} = Error -> + ?tp( + mysql_connector_prepare_query_failed, + #{error => not_prepared} + ), ?SLOG( warning, LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 292c02580..95f919f1c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -35,59 +35,58 @@ all() -> [ - {group, with_batch}, - {group, without_batch} + {group, tcp}, + {group, tls} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), + NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement], [ - {with_batch, [ - {group, sync_query} + {tcp, [ + {group, with_batch}, + {group, without_batch} ]}, - {without_batch, [ - {group, sync_query} + {tls, [ + {group, with_batch}, + {group, without_batch} ]}, - {sync_query, [ - {group, tcp}, - {group, tls} - ]}, - {tcp, TCs}, - {tls, TCs} + {with_batch, TCs -- NonBatchCases}, + {without_batch, TCs} ]. -init_per_group(tcp, Config0) -> +init_per_group(tcp, Config) -> MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"), MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")), - Config = [ + [ {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, false}, + {query_mode, sync}, {proxy_name, "mysql_tcp"} - | Config0 - ], - common_init(Config); -init_per_group(tls, Config0) -> + | Config + ]; +init_per_group(tls, Config) -> MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"), MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")), - Config = [ + [ {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, true}, + {query_mode, sync}, {proxy_name, "mysql_tls"} - | Config0 - ], + | Config + ]; +init_per_group(with_batch, Config0) -> + Config = [{enable_batch, true} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{enable_batch, false} | Config0], common_init(Config); -init_per_group(sync_query, Config) -> - [{query_mode, sync} | Config]; -init_per_group(with_batch, Config) -> - [{enable_batch, true} | Config]; -init_per_group(without_batch, Config) -> - [{enable_batch, false} | Config]; init_per_group(_Group, Config) -> Config. -end_per_group(Group, Config) when Group =:= tcp; Group =:= tls -> +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> connect_and_drop_table(Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), @@ -224,6 +223,25 @@ send_message(Config, Payload) -> BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), emqx_bridge:send_message(BridgeID, Payload). +query_resource(Config, Request) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request). + +unprepare(Config, Key) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + [ + begin + {ok, Conn} = ecpool_worker:client(Worker), + ok = mysql:unprepare(Conn, Key) + end + || {_Name, Worker} <- ecpool:workers(PoolName) + ]. + % We need to create and drop the test table outside of using bridges % since a bridge expects the table to exist when enabling it. We % therefore call the mysql module directly, in addition to using it @@ -392,3 +410,114 @@ t_write_failure(Config) -> end ), ok. + +% This test doesn't work with batch enabled since it is not possible +% to set the timeout directly for batch queries +t_write_timeout(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 10, + ?check_trace( + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + query_resource(Config, {send_message, SentData, [], Timeout}) + end), + fun(Result, _Trace) -> + ?assertMatch({error, {resource_error, _}}, Result), + ok + end + ), + ok. + +t_simple_sql_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"SELECT count(1) AS T">>}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, batch_select_not_implemented}, Result); + false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) + end, + ok. + +t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{}), + case ?config(enable_batch, Config) of + true -> + ?assertMatch( + {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result + ); + false -> + ?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result) + end, + ok. + +t_bad_sql_parameter(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"">>, [bad_parameter]}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, invalid_request}, Result); + false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result) + end, + ok. + +t_unprepared_statement_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {prepared_query, unprepared_query, []}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, invalid_request}, Result); + false -> ?assertEqual({error, prepared_statement_is_unprepared}, Result) + end, + ok. + +%% Test doesn't work with batch enabled since batch doesn't use +%% prepared statements as such; it has its own query generation process +t_uninitialized_prepared_statement(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + unprepare(Config, send_message), + ?check_trace( + begin + ?assertEqual(ok, send_message(Config, SentData)), + ok + end, + fun(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared}, + #{ + ?snk_kind := mysql_connector_on_query_prepared_sql, + type_or_key := send_message + }, + Trace + ) + ), + SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace), + ?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace), + ReturnTrace = ?of_kind(mysql_connector_query_return, Trace), + ?assertMatch([#{result := ok}], ReturnTrace), + ok + end + ), + ok. From d5a41b801e66f970f5f483bc5cde485554f16326 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Tue, 20 Dec 2022 13:57:44 +0100 Subject: [PATCH 3/6] chore: update changes --- changes/v5.0.13-en.md | 2 ++ changes/v5.0.13-zh.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index acd210e71..38ca0a967 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -20,3 +20,5 @@ - Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578). Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing. + +- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 9b74dc5f0..da58e90a2 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -20,3 +20,5 @@ - 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。 在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。 + +- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571). From aab914d65a0d5b2e3333af59ed2fbb74a446bee7 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Wed, 21 Dec 2022 10:13:05 +0100 Subject: [PATCH 4/6] test: refactor EE mysql bridge test case Co-authored-by: Thales Macedo Garitezi --- .../test/emqx_ee_bridge_mysql_SUITE.erl | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 95f919f1c..06d91eaa1 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -421,15 +421,12 @@ t_write_timeout(Config) -> Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, Timeout = 10, - ?check_trace( - emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, _}}, query_resource(Config, {send_message, SentData, [], Timeout}) - end), - fun(Result, _Trace) -> - ?assertMatch({error, {resource_error, _}}, Result), - ok - end - ), + ) + end), ok. t_simple_sql_query(Config) -> From cc47ce0034efc11c6ec112c52247ee369fb0109a Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Wed, 21 Dec 2022 11:31:43 +0100 Subject: [PATCH 5/6] chore: update changes --- changes/v5.0.13-zh.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index da58e90a2..619803dfc 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -21,4 +21,4 @@ - 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。 在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。 -- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571). +- 增强 mysql 查询流程,确保给定的查询语句在 mysql 连接器的预编译语句中 [#9571](https://github.com/emqx/emqx/pull/9571)。 From 13942f5c4962c72f808863dd2e096bebf5144ed9 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 22 Dec 2022 10:29:12 +0100 Subject: [PATCH 6/6] refactor: rename error return in mysql connector --- apps/emqx_connector/src/emqx_connector_mysql.erl | 2 +- lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 4e3a0ed22..634968b09 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -263,7 +263,7 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> maybe_prepare_sql(SQLOrKey, Prepares, PoolName) -> case maps:is_key(SQLOrKey, Prepares) of true -> prepare_sql(Prepares, PoolName); - false -> {error, prepared_statement_is_unprepared} + false -> {error, prepared_statement_invalid} end. prepare_sql(Prepares, PoolName) when is_map(Prepares) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 06d91eaa1..78cb78f8d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -480,7 +480,7 @@ t_unprepared_statement_query(Config) -> Result = query_resource(Config, Request), case ?config(enable_batch, Config) of true -> ?assertEqual({error, invalid_request}, Result); - false -> ?assertEqual({error, prepared_statement_is_unprepared}, Result) + false -> ?assertEqual({error, prepared_statement_invalid}, Result) end, ok.