diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 0ed7d282a..816eace0d 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -106,7 +106,7 @@ authenticate(#{password := Password} = Credential, resource_id := ResourceId, password_hash_algorithm := Algorithm}) -> Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {sql, Query, Params}) of + case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> NColumns = [Name || #column{name = Name} <- Columns], diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index 8f1f12690..e33f5c100 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -441,12 +441,12 @@ create_user(Values) -> q(Sql) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql}). + {query, Sql}). q(Sql, Params) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql, Params}). + {query, Sql, Params}). drop_seeds() -> {ok, _, _} = q("DROP TABLE IF EXISTS users"), diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index 4e84fe418..926f6fe3c 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -73,7 +73,7 @@ authorize(Client, PubSub, Topic, query := {Query, Params} } }) -> - case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of + case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> do_authorize(Client, PubSub, Topic, Columns, Rows); diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 92c479f92..a264d3407 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -228,12 +228,12 @@ raw_pgsql_authz_config() -> q(Sql) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql}). + {query, Sql}). insert(Sql, Params) -> {ok, _} = emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql, Params}), + {query, Sql, Params}), ok. init_table() -> diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 58706e950..eadc4e773 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -6,7 +6,7 @@ {deps, [ {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, - {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.6.0"}}}, + {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.1"}}}, %% NOTE: mind poolboy version when updating mongodb-erlang version {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, %% NOTE: mind poolboy version when updating eredis_cluster version diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index f42bed666..2f09ae59a 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -32,7 +32,9 @@ -export([connect/1]). --export([query/3]). +-export([ query/3 + , prepared_query/4 + ]). -export([do_health_check/1]). @@ -60,8 +62,7 @@ on_start(InstId, #{server := {Host, Port}, connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> - [{ssl, [{server_name_indication, disable} | - emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}]; + [{ssl, [emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}]; false -> [] end, Options = [{host, Host}, @@ -80,12 +81,16 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, []}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> +on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> + {Command, Args} = case QueryParams of + {query, SQL} -> {query, [SQL, []]}; + {query, SQL, Params} -> {query, [SQL, Params]}; + {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; + {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} + end, ?SLOG(debug, #{msg => "postgresql connector received sql query", - connector => InstId, sql => SQL, state => State}), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of + connector => InstId, command => Command, args => Args, state => State}), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of {error, Reason} -> ?SLOG(error, #{ msg => "postgresql connector do sql query failed", @@ -115,6 +120,9 @@ connect(Opts) -> query(Conn, SQL, Params) -> epgsql:equery(Conn, SQL, Params). +prepared_query(Conn, Name, SQL, Params) -> + epgsql:prepared_query2(Conn, Name, SQL, Params). + conn_opts(Opts) -> conn_opts(Opts, []). conn_opts([], Acc) ->