chore: provide prepared_query and equery

This commit is contained in:
zhouzb 2021-12-28 09:52:31 +08:00
parent fc89fb0f8a
commit d26042703e
5 changed files with 22 additions and 14 deletions

View File

@ -106,7 +106,7 @@ authenticate(#{password := Password} = Credential,
resource_id := ResourceId, resource_id := ResourceId,
password_hash_algorithm := Algorithm}) -> password_hash_algorithm := Algorithm}) ->
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(ResourceId, {sql, Query, Params}) of case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of
{ok, _Columns, []} -> ignore; {ok, _Columns, []} -> ignore;
{ok, Columns, [Row | _]} -> {ok, Columns, [Row | _]} ->
NColumns = [Name || #column{name = Name} <- Columns], NColumns = [Name || #column{name = Name} <- Columns],

View File

@ -441,12 +441,12 @@ create_user(Values) ->
q(Sql) -> q(Sql) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql}). {query, Sql}).
q(Sql, Params) -> q(Sql, Params) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql, Params}). {query, Sql, Params}).
drop_seeds() -> drop_seeds() ->
{ok, _, _} = q("DROP TABLE IF EXISTS users"), {ok, _, _} = q("DROP TABLE IF EXISTS users"),

View File

@ -73,7 +73,7 @@ authorize(Client, PubSub, Topic,
query := {Query, Params} query := {Query, Params}
} }
}) -> }) ->
case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch; {ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} -> {ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows); do_authorize(Client, PubSub, Topic, Columns, Rows);

View File

@ -228,12 +228,12 @@ raw_pgsql_authz_config() ->
q(Sql) -> q(Sql) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql}). {query, Sql}).
insert(Sql, Params) -> insert(Sql, Params) ->
{ok, _} = emqx_resource:query( {ok, _} = emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql, Params}), {query, Sql, Params}),
ok. ok.
init_table() -> init_table() ->

View File

@ -32,7 +32,9 @@
-export([connect/1]). -export([connect/1]).
-export([query/4]). -export([ query/3
, query/4
]).
-export([do_health_check/1]). -export([do_health_check/1]).
@ -60,8 +62,7 @@ on_start(InstId, #{server := {Host, Port},
connector => InstId, config => Config}), connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of SslOpts = case maps:get(enable, SSL) of
true -> true ->
[{ssl, [{server_name_indication, disable} | [{ssl, [emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
false -> [] false -> []
end, end,
Options = [{host, Host}, Options = [{host, Host},
@ -80,12 +81,16 @@ on_stop(InstId, #{poolname := PoolName}) ->
connector => InstId}), connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, State); {Command, Args} = case QueryParams of
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> {query, SQL} -> {query, [SQL, []]};
{query, SQL, Params} -> {query, [SQL, Params]};
{prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
{prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
end,
?SLOG(debug, #{msg => "postgresql connector received sql query", ?SLOG(debug, #{msg => "postgresql connector received sql query",
connector => InstId, sql => SQL, state => State}), connector => InstId, command => Command, args => Args, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [InstId, SQL, Params]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "postgresql connector do sql query failed", msg => "postgresql connector do sql query failed",
@ -112,6 +117,9 @@ connect(Opts) ->
Password = proplists:get_value(password, Opts), Password = proplists:get_value(password, Opts),
epgsql:connect(Host, Username, Password, conn_opts(Opts)). epgsql:connect(Host, Username, Password, conn_opts(Opts)).
query(Conn, SQL, Params) ->
epgsql:equery(Conn, SQL, Params).
query(Conn, Name, SQL, Params) -> query(Conn, Name, SQL, Params) ->
epgsql:prepared_query2(Conn, Name, SQL, Params). epgsql:prepared_query2(Conn, Name, SQL, Params).