From 0faf1240f30b8e7899f6688f20f0bbeefa73a429 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 19 Apr 2022 16:24:46 +0800 Subject: [PATCH] fix: mysql support prepare sql --- .../src/emqx_connector_mysql.erl | 79 ++++++++++++++++--- 1 file changed, 69 insertions(+), 10 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index e82999ba7..6b1a37974 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -29,7 +29,10 @@ , on_health_check/2 ]). --export([connect/1]). +%% ecpool connect & reconnect +-export([connect/1, prepare_sql_to_conn/1]). + +-export([prepare_sql/2]). -export([roots/0, fields/1]). @@ -91,25 +94,35 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> - ?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}), +on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State); +on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State); +on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> + LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, + ?TRACE("QUERY", "mysql_connector_received", LogMeta), case Result = ecpool:pick_and_do( PoolName, - {mysql, query, [SQL, Params, Timeout]}, + {mysql, mysql_function(Type), [SQLOrKey, Params, Timeout]}, no_handover) of + {error, disconnected} -> + ?SLOG(error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}), + %% kill the poll worker to trigger reconnection + _ = exit(Conn, restart), + emqx_resource:query_failed(AfterQuery); {error, Reason} -> - ?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed", - connector => InstId, sql => SQL, reason => Reason}), + ?SLOG(error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) end, Result. +mysql_function(sql) -> query; +mysql_function(prepared_query) -> execute. + on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). @@ -127,3 +140,49 @@ connect(Options) -> -> {inet:ip_address() | inet:hostname(), pos_integer()}. to_server(Str) -> emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS). + +prepare_sql(Prepares, PoolName) -> + case do_prepare_sql(Prepares, PoolName) of + ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + ok; + {error, R} -> + {error, R} + end. + +do_prepare_sql([{PrepareSqlKey, PrepareStatement} | Prepares], PoolName) -> + Workers = + [begin + {ok, Conn} = ecpool_worker:client(Worker), + Conn + end || Worker <- ecpool:workers(PoolName)], + prepare_sql_to_conn_list(Workers, [{PrepareSqlKey, PrepareStatement}]). + +prepare_sql_to_conn_list([], PrepareList) -> ok; +prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> + case prepare_sql_to_conn(Conn, PrepareList) of + ok -> + prepare_sql_to_conn(ConnList, PrepareList); + {error, R} -> + %% rollback + [unprepare_sql_to_conn(Conn, Key) || {Key, _} <- PrepareList], + {error, R} + end. + +prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; +prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) -> + LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey}, + ?SLOG(info, LogMeta#{prepare => PrepareStatement}), + _ = mysql:unprepare(Conn, PrepareSqlKey), + case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of + {ok, Name} -> + ?SLOG(info, LogMeta#{result => success}), + prepare_sql_to_conn(Conn, PrepareList); + {error, Reason} -> + ?SLOG(error, LogMeta#{result => failed, reason => Reason}), + {error, Reason}; + end. + +unprepare_sql_to_conn(Conn, PrepareSqlKey) -> + mysql:unprepare(Conn, PrepareSqlKey).