From 365ca670789dc4f656b58db6e0ce1c564272b5cc Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 20 Apr 2022 21:47:36 +0800 Subject: [PATCH] fix: auth mysql prepare query --- .../src/simple_authn/emqx_authn_mysql.erl | 19 +++++++-------- .../src/emqx_connector_mysql.erl | 24 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 11e8ede76..37043961f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -23,6 +23,8 @@ -behaviour(hocon_schema). -behaviour(emqx_authentication). +-define(PREPARE_KEY, ?MODULE). + -export([ namespace/0, roots/0, @@ -89,12 +91,11 @@ create( } = Config ) -> ok = emqx_authn_password_hashing:init(Algorithm), - {PrepareSqlKey, PrepareStatement} = emqx_authn_utils:parse_sql(Query0, '?'), + {PrepareSql, TmplToken} = emqx_authn_utils:parse_sql(Query0, '?'), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), State = #{ password_hash_algorithm => Algorithm, - prepare_sql_key => PrepareSqlKey, - prepare_sql_statement => PrepareStatement, + tmpl_token => TmplToken, query_timeout => QueryTimeout, resource_id => ResourceId }, @@ -108,8 +109,7 @@ create( ) of {ok, _} -> - case emqx_resource:query(ResourceId, - {prepare_sql, [{PrepareSqlKey, PrepareStatement}]}) of + case emqx_resource:query(ResourceId, {prepare_sql, [{?PREPARE_KEY, PrepareSql}]}) of ok -> {ok, State}; {error, Reason} -> @@ -133,15 +133,14 @@ authenticate(#{auth_method := _}, _) -> authenticate( #{password := Password} = Credential, #{ - placeholders := PlaceHolders, - query := Query, + tmpl_token := TmplToken, query_timeout := Timeout, resource_id := ResourceId, password_hash_algorithm := Algorithm } ) -> - Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of + Params = emqx_authn_utils:render_sql_params(TmplToken, Credential), + case emqx_resource:query(ResourceId, {prepared_query, ?PREPARE_KEY, Params, Timeout}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> @@ -160,7 +159,7 @@ authenticate( ?SLOG(error, #{ msg => "mysql_query_failed", resource => ResourceId, - query => Query, + tmpl_token => TmplToken, params => Params, timeout => Timeout, reason => Reason diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 21e90c929..37534ccff 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -104,8 +104,10 @@ on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - Conn = ecpool:get_client(PoolName), - Result = erlang:apply(mysql, mysql_function(Type), [Conn, SQLOrKey, Params, Timeout]), + Worker = ecpool:get_client(PoolName), + {ok, Conn} = ecpool_worker:client(Worker), + MySqlFunction = mysql_function(Type), + Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey, Params, Timeout]), case Result of {error, disconnected} -> ?SLOG(error, @@ -154,12 +156,12 @@ prepare_sql(Prepares, PoolName) -> end. do_prepare_sql(Prepares, PoolName) -> - Workers = + Conns = [begin {ok, Conn} = ecpool_worker:client(Worker), Conn - end || Worker <- ecpool:workers(PoolName)], - prepare_sql_to_conn_list(Workers, Prepares). + end || {_Name, Worker} <- ecpool:workers(PoolName)], + prepare_sql_to_conn_list(Conns, Prepares). prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> @@ -174,12 +176,12 @@ prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> end. prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; -prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) -> - LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey}, - ?SLOG(info, LogMeta#{prepare => PrepareStatement}), - _ = mysql:unprepare(Conn, PrepareSqlKey), - case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of - {ok, _Name} -> +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> + LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + _ = unprepare_sql_to_conn(Conn, Key), + case mysql:prepare(Conn, Key, SQL) of + {ok, _Key} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); {error, Reason} ->