diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index f431178bc..222dcc4db 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -80,7 +80,7 @@ create(#{query := Query0, placeholders => PlaceHolders, password_hash_algorithm => Algorithm, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config) of + case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of {ok, already_created} -> {ok, State}; {ok, _} -> @@ -101,12 +101,11 @@ update(Config, State) -> authenticate(#{auth_method := _}, _) -> ignore; authenticate(#{password := Password} = Credential, - #{query := Query, - placeholders := PlaceHolders, + #{placeholders := PlaceHolders, resource_id := ResourceId, password_hash_algorithm := Algorithm}) -> Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of + case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Params}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> NColumns = [Name || #column{name = Name} <- Columns], @@ -133,7 +132,6 @@ destroy(#{resource_id := ResourceId}) -> %% Internal functions %%------------------------------------------------------------------------------ -%% TODO: Support prepare parse_query(Query) -> case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of {match, Captured} -> diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index d93d5447c..cfd58c53f 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -39,12 +39,19 @@ description() -> "AuthZ with Postgresql". -init(#{query := SQL} = Source) -> - case emqx_authz_utils:create_resource(emqx_connector_pgsql, Source) of - {error, Reason} -> error({load_config_error, Reason}); - {ok, Id} -> Source#{annotations => - #{id => Id, - query => parse_query(SQL)}} +init(#{query := SQL0} = Source) -> + {SQL, PlaceHolders} = parse_query(SQL0), + ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql), + case emqx_resource:create_local( + ResourceID, + emqx_connector_pgsql, + Source#{named_queries => #{ResourceID => SQL}}) of + {ok, _} -> + Source#{annotations => + #{id => ResourceID, + placeholders => PlaceHolders}}; + {error, Reason} -> + error({load_config_error, Reason}) end. destroy(#{annotations := #{id := Id}}) -> @@ -70,10 +77,10 @@ parse_query(Sql) -> authorize(Client, PubSub, Topic, #{annotations := #{id := ResourceID, - query := {Query, Params} + placeholders := Placeholders } }) -> - case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of + case emqx_resource:query(ResourceID, {prepared_query, ResourceID, replvar(Placeholders, Client)}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> do_authorize(Client, PubSub, Topic, Columns, Rows); diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 0dcddcc68..34f386282 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). -export([roots/0, fields/1]). @@ -33,7 +34,7 @@ -export([connect/1]). -export([ query/3 - , prepared_query/4 + , prepared_query/3 ]). -export([do_health_check/1]). @@ -44,12 +45,17 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> + [{named_queries, fun named_queries/1}] ++ emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). on_jsonify(#{server := Server}= Config) -> Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}. +named_queries(type) -> map(); +named_queries(nullable) -> true; +named_queries(_) -> undefined. + %% =================================================================== on_start(InstId, #{server := {Host, Port}, database := DB, @@ -57,7 +63,7 @@ on_start(InstId, #{server := {Host, Port}, password := Password, auto_reconnect := AutoReconn, pool_size := PoolSize, - ssl := SSL } = Config) -> + ssl := SSL} = Config) -> ?SLOG(info, #{msg => "starting_postgresql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of @@ -74,7 +80,8 @@ on_start(InstId, #{server := {Host, Port}, {password, Password}, {database, DB}, {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize}], + {pool_size, PoolSize}, + {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), {ok, #{poolname => PoolName}}. @@ -84,20 +91,17 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> - {Command, Args} = case QueryParams of - {query, SQL} -> {query, [SQL, []]}; - {query, SQL, Params} -> {query, [SQL, Params]}; - {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; - {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} - end, - ?TRACE("QUERY", "postgresql_connector_received", - #{connector => InstId, command => Command, args => Args, state => State}), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of +on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State); + +on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> + ?SLOG(debug, #{msg => "postgresql connector received sql query", + connector => InstId, sql => NameOrSQL, state => State}), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of {error, Reason} -> ?SLOG(error, #{ - msg => "postgresql_connector_do_sql_query_failed", - connector => InstId, sql => SQL, reason => Reason}), + msg => "postgresql connector do sql query failed", + connector => InstId, sql => NameOrSQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) @@ -118,13 +122,30 @@ connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), Password = proplists:get_value(password, Opts), - epgsql:connect(Host, Username, Password, conn_opts(Opts)). + NamedQueries = proplists:get_value(named_queries, Opts), + case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of + {ok, Conn} -> + parse(Conn, NamedQueries), + {ok, Conn}; + {error, Reason} -> + {error, Reason} + end. query(Conn, SQL, Params) -> epgsql:equery(Conn, SQL, Params). -prepared_query(Conn, Name, SQL, Params) -> - epgsql:prepared_query2(Conn, Name, SQL, Params). +prepared_query(Conn, Name, Params) -> + epgsql:prepared_query2(Conn, Name, Params). + +parse(_Conn, []) -> + ok; +parse(Conn, [{Name, Query} | More]) -> + case epgsql:parse2(Conn, Name, Query, []) of + {ok, _Statement} -> + parse(Conn, More); + Other -> + Other + end. conn_opts(Opts) -> conn_opts(Opts, []).