fix: auth mysql prepare query

This commit is contained in:
DDDHuang 2022-04-20 21:47:36 +08:00
parent 94795098c9
commit 365ca67078
2 changed files with 22 additions and 21 deletions

View File

@ -23,6 +23,8 @@
-behaviour(hocon_schema). -behaviour(hocon_schema).
-behaviour(emqx_authentication). -behaviour(emqx_authentication).
-define(PREPARE_KEY, ?MODULE).
-export([ -export([
namespace/0, namespace/0,
roots/0, roots/0,
@ -89,12 +91,11 @@ create(
} = Config } = Config
) -> ) ->
ok = emqx_authn_password_hashing:init(Algorithm), ok = emqx_authn_password_hashing:init(Algorithm),
{PrepareSqlKey, PrepareStatement} = emqx_authn_utils:parse_sql(Query0, '?'), {PrepareSql, TmplToken} = emqx_authn_utils:parse_sql(Query0, '?'),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
State = #{ State = #{
password_hash_algorithm => Algorithm, password_hash_algorithm => Algorithm,
prepare_sql_key => PrepareSqlKey, tmpl_token => TmplToken,
prepare_sql_statement => PrepareStatement,
query_timeout => QueryTimeout, query_timeout => QueryTimeout,
resource_id => ResourceId resource_id => ResourceId
}, },
@ -108,8 +109,7 @@ create(
) )
of of
{ok, _} -> {ok, _} ->
case emqx_resource:query(ResourceId, case emqx_resource:query(ResourceId, {prepare_sql, [{?PREPARE_KEY, PrepareSql}]}) of
{prepare_sql, [{PrepareSqlKey, PrepareStatement}]}) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -133,15 +133,14 @@ authenticate(#{auth_method := _}, _) ->
authenticate( authenticate(
#{password := Password} = Credential, #{password := Password} = Credential,
#{ #{
placeholders := PlaceHolders, tmpl_token := TmplToken,
query := Query,
query_timeout := Timeout, query_timeout := Timeout,
resource_id := ResourceId, resource_id := ResourceId,
password_hash_algorithm := Algorithm password_hash_algorithm := Algorithm
} }
) -> ) ->
Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential), Params = emqx_authn_utils:render_sql_params(TmplToken, Credential),
case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of case emqx_resource:query(ResourceId, {prepared_query, ?PREPARE_KEY, Params, Timeout}) of
{ok, _Columns, []} -> {ok, _Columns, []} ->
ignore; ignore;
{ok, Columns, [Row | _]} -> {ok, Columns, [Row | _]} ->
@ -160,7 +159,7 @@ authenticate(
?SLOG(error, #{ ?SLOG(error, #{
msg => "mysql_query_failed", msg => "mysql_query_failed",
resource => ResourceId, resource => ResourceId,
query => Query, tmpl_token => TmplToken,
params => Params, params => Params,
timeout => Timeout, timeout => Timeout,
reason => Reason reason => Reason

View File

@ -104,8 +104,10 @@ on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName}
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta), ?TRACE("QUERY", "mysql_connector_received", LogMeta),
Conn = ecpool:get_client(PoolName), Worker = ecpool:get_client(PoolName),
Result = erlang:apply(mysql, mysql_function(Type), [Conn, SQLOrKey, Params, Timeout]), {ok, Conn} = ecpool_worker:client(Worker),
MySqlFunction = mysql_function(Type),
Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey, Params, Timeout]),
case Result of case Result of
{error, disconnected} -> {error, disconnected} ->
?SLOG(error, ?SLOG(error,
@ -154,12 +156,12 @@ prepare_sql(Prepares, PoolName) ->
end. end.
do_prepare_sql(Prepares, PoolName) -> do_prepare_sql(Prepares, PoolName) ->
Workers = Conns =
[begin [begin
{ok, Conn} = ecpool_worker:client(Worker), {ok, Conn} = ecpool_worker:client(Worker),
Conn Conn
end || Worker <- ecpool:workers(PoolName)], end || {_Name, Worker} <- ecpool:workers(PoolName)],
prepare_sql_to_conn_list(Workers, Prepares). prepare_sql_to_conn_list(Conns, Prepares).
prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([], _PrepareList) -> ok;
prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
@ -174,12 +176,12 @@ prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
end. end.
prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok;
prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) -> prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey}, LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL},
?SLOG(info, LogMeta#{prepare => PrepareStatement}), ?SLOG(info, LogMeta),
_ = mysql:unprepare(Conn, PrepareSqlKey), _ = unprepare_sql_to_conn(Conn, Key),
case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of case mysql:prepare(Conn, Key, SQL) of
{ok, _Name} -> {ok, _Key} ->
?SLOG(info, LogMeta#{result => success}), ?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList); prepare_sql_to_conn(Conn, PrepareList);
{error, Reason} -> {error, Reason} ->