diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6b1a37974..f662bbe5b 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -30,7 +30,7 @@ ]). %% ecpool connect & reconnect --export([connect/1, prepare_sql_to_conn/1]). +-export([connect/1, prepare_sql_to_conn/2]). -export([prepare_sql/2]). @@ -101,10 +101,9 @@ on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - case Result = ecpool:pick_and_do( - PoolName, - {mysql, mysql_function(Type), [SQLOrKey, Params, Timeout]}, - no_handover) of + Conn = ecpool:get_client(PoolName), + Result = erlang:apply(mysql, mysql_function(Type), [Conn, SQLOrKey, Params, Timeout]), + case Result of {error, disconnected} -> ?SLOG(error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}), @@ -151,15 +150,15 @@ prepare_sql(Prepares, PoolName) -> {error, R} end. -do_prepare_sql([{PrepareSqlKey, PrepareStatement} | Prepares], PoolName) -> +do_prepare_sql(Prepares, PoolName) -> Workers = [begin {ok, Conn} = ecpool_worker:client(Worker), Conn end || Worker <- ecpool:workers(PoolName)], - prepare_sql_to_conn_list(Workers, [{PrepareSqlKey, PrepareStatement}]). + prepare_sql_to_conn_list(Workers, Prepares). -prepare_sql_to_conn_list([], PrepareList) -> ok; +prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> case prepare_sql_to_conn(Conn, PrepareList) of ok -> @@ -176,12 +175,12 @@ prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) whe ?SLOG(info, LogMeta#{prepare => PrepareStatement}), _ = mysql:unprepare(Conn, PrepareSqlKey), case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of - {ok, Name} -> + {ok, _Name} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); {error, Reason} -> ?SLOG(error, LogMeta#{result => failed, reason => Reason}), - {error, Reason}; + {error, Reason} end. unprepare_sql_to_conn(Conn, PrepareSqlKey) ->