diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b7cfd84b6..4e3a0ed22 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -419,7 +419,7 @@ on_sql_query( {ok, Conn} = ecpool_worker:client(Worker), ?tp( mysql_connector_send_query, - #{sql_or_key => SQLOrKey, data => Data} + #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} ), try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of {error, disconnected} = Result -> @@ -431,6 +431,10 @@ on_sql_query( _ = exit(Conn, restart), Result; {error, not_prepared} = Error -> + ?tp( + mysql_connector_prepare_query_failed, + #{error => not_prepared} + ), ?SLOG( warning, LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 292c02580..95f919f1c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -35,59 +35,58 @@ all() -> [ - {group, with_batch}, - {group, without_batch} + {group, tcp}, + {group, tls} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), + NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement], [ - {with_batch, [ - {group, sync_query} + {tcp, [ + {group, with_batch}, + {group, without_batch} ]}, - {without_batch, [ - {group, sync_query} + {tls, [ + {group, with_batch}, + {group, without_batch} ]}, - {sync_query, [ - {group, tcp}, - {group, tls} - ]}, - {tcp, TCs}, - {tls, TCs} + {with_batch, TCs -- NonBatchCases}, + {without_batch, TCs} ]. -init_per_group(tcp, Config0) -> +init_per_group(tcp, Config) -> MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"), MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")), - Config = [ + [ {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, false}, + {query_mode, sync}, {proxy_name, "mysql_tcp"} - | Config0 - ], - common_init(Config); -init_per_group(tls, Config0) -> + | Config + ]; +init_per_group(tls, Config) -> MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"), MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")), - Config = [ + [ {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, true}, + {query_mode, sync}, {proxy_name, "mysql_tls"} - | Config0 - ], + | Config + ]; +init_per_group(with_batch, Config0) -> + Config = [{enable_batch, true} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{enable_batch, false} | Config0], common_init(Config); -init_per_group(sync_query, Config) -> - [{query_mode, sync} | Config]; -init_per_group(with_batch, Config) -> - [{enable_batch, true} | Config]; -init_per_group(without_batch, Config) -> - [{enable_batch, false} | Config]; init_per_group(_Group, Config) -> Config. -end_per_group(Group, Config) when Group =:= tcp; Group =:= tls -> +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> connect_and_drop_table(Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), @@ -224,6 +223,25 @@ send_message(Config, Payload) -> BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), emqx_bridge:send_message(BridgeID, Payload). +query_resource(Config, Request) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request). + +unprepare(Config, Key) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + [ + begin + {ok, Conn} = ecpool_worker:client(Worker), + ok = mysql:unprepare(Conn, Key) + end + || {_Name, Worker} <- ecpool:workers(PoolName) + ]. + % We need to create and drop the test table outside of using bridges % since a bridge expects the table to exist when enabling it. We % therefore call the mysql module directly, in addition to using it @@ -392,3 +410,114 @@ t_write_failure(Config) -> end ), ok. + +% This test doesn't work with batch enabled since it is not possible +% to set the timeout directly for batch queries +t_write_timeout(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 10, + ?check_trace( + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + query_resource(Config, {send_message, SentData, [], Timeout}) + end), + fun(Result, _Trace) -> + ?assertMatch({error, {resource_error, _}}, Result), + ok + end + ), + ok. + +t_simple_sql_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"SELECT count(1) AS T">>}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, batch_select_not_implemented}, Result); + false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) + end, + ok. + +t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{}), + case ?config(enable_batch, Config) of + true -> + ?assertMatch( + {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result + ); + false -> + ?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result) + end, + ok. + +t_bad_sql_parameter(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"">>, [bad_parameter]}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, invalid_request}, Result); + false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result) + end, + ok. + +t_unprepared_statement_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {prepared_query, unprepared_query, []}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, invalid_request}, Result); + false -> ?assertEqual({error, prepared_statement_is_unprepared}, Result) + end, + ok. + +%% Test doesn't work with batch enabled since batch doesn't use +%% prepared statements as such; it has its own query generation process +t_uninitialized_prepared_statement(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + unprepare(Config, send_message), + ?check_trace( + begin + ?assertEqual(ok, send_message(Config, SentData)), + ok + end, + fun(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared}, + #{ + ?snk_kind := mysql_connector_on_query_prepared_sql, + type_or_key := send_message + }, + Trace + ) + ), + SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace), + ?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace), + ReturnTrace = ?of_kind(mysql_connector_query_return, Trace), + ?assertMatch([#{result := ok}], ReturnTrace), + ok + end + ), + ok.