diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index f431178bc..6589fc8eb 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -76,11 +76,10 @@ create(#{query := Query0, ok = emqx_authn_password_hashing:init(Algorithm), {Query, PlaceHolders} = parse_query(Query0), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), - State = #{query => Query, - placeholders => PlaceHolders, + State = #{placeholders => PlaceHolders, password_hash_algorithm => Algorithm, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config) of + case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of {ok, already_created} -> {ok, State}; {ok, _} -> @@ -101,12 +100,11 @@ update(Config, State) -> authenticate(#{auth_method := _}, _) -> ignore; authenticate(#{password := Password} = Credential, - #{query := Query, - placeholders := PlaceHolders, + #{placeholders := PlaceHolders, resource_id := ResourceId, password_hash_algorithm := Algorithm}) -> Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of + case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Params}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> NColumns = [Name || #column{name = Name} <- Columns], @@ -133,7 +131,6 @@ destroy(#{resource_id := ResourceId}) -> %% Internal functions %%------------------------------------------------------------------------------ -%% TODO: Support prepare parse_query(Query) -> case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of {match, Captured} -> diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index bfccaf128..a2c95314d 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -31,10 +31,10 @@ -define(PATH, [authentication]). all() -> - [{group, require_seeds}, t_create, t_create_invalid, t_parse_query]. + [{group, require_seeds}, t_create_invalid, t_parse_query]. groups() -> - [{require_seeds, [], [t_authenticate, t_update, t_destroy, t_is_superuser]}]. + [{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}]. init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), @@ -171,7 +171,7 @@ t_update(_Config) -> IncorrectConfig = CorrectConfig#{ query => <<"SELECT password_hash, salt, is_superuser_str as is_superuser - FROM wrong_table where username = ${username} LIMIT 1">>}, + FROM users where username = ${username} LIMIT 0">>}, {ok, _} = emqx:update_config( ?PATH, diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl index 51a1ce8dd..0225ed06f 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl @@ -125,8 +125,7 @@ raw_pgsql_auth_config(SpecificSSLOpts) -> username => <<"root">>, password => <<"public">>, - query => <<"SELECT password_hash, salt, is_superuser_str as is_superuser - FROM users where username = ${username} LIMIT 1">>, + query => <<"SELECT 1">>, server => pgsql_server(), ssl => maps:merge(SSLOpts, SpecificSSLOpts) }. diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index d93d5447c..cfd58c53f 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -39,12 +39,19 @@ description() -> "AuthZ with Postgresql". -init(#{query := SQL} = Source) -> - case emqx_authz_utils:create_resource(emqx_connector_pgsql, Source) of - {error, Reason} -> error({load_config_error, Reason}); - {ok, Id} -> Source#{annotations => - #{id => Id, - query => parse_query(SQL)}} +init(#{query := SQL0} = Source) -> + {SQL, PlaceHolders} = parse_query(SQL0), + ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql), + case emqx_resource:create_local( + ResourceID, + emqx_connector_pgsql, + Source#{named_queries => #{ResourceID => SQL}}) of + {ok, _} -> + Source#{annotations => + #{id => ResourceID, + placeholders => PlaceHolders}}; + {error, Reason} -> + error({load_config_error, Reason}) end. destroy(#{annotations := #{id := Id}}) -> @@ -70,10 +77,10 @@ parse_query(Sql) -> authorize(Client, PubSub, Topic, #{annotations := #{id := ResourceID, - query := {Query, Params} + placeholders := Placeholders } }) -> - case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of + case emqx_resource:query(ResourceID, {prepared_query, ResourceID, replvar(Placeholders, Client)}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> do_authorize(Client, PubSub, Topic, Columns, Rows); diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 6ececd3f8..45745326d 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -168,7 +168,8 @@ t_pgsql_error(_Config) -> }, ok = setup_config( - #{<<"query">> => <<"SOME INVALID STATEMENT">>}), + #{<<"query">> => <<"SELECT permission, action, topic " + "FROM acl WHERE clientid = ${username}">>}), ok = emqx_authz_test_lib:test_samples( ClientInfo, diff --git a/apps/emqx_authz/test/emqx_authz_test_lib.erl b/apps/emqx_authz/test/emqx_authz_test_lib.erl index bacbbe10a..b7de5c3da 100644 --- a/apps/emqx_authz/test/emqx_authz_test_lib.erl +++ b/apps/emqx_authz/test/emqx_authz_test_lib.erl @@ -40,8 +40,10 @@ reset_authorizers(Nomatch, ChacheEnabled) -> setup_config(BaseConfig, SpecialParams) -> Config = maps:merge(BaseConfig, SpecialParams), - {ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]), - ok. + case emqx_authz:update(?CMD_REPLACE, [Config]) of + {ok, _} -> ok; + {error, Reason} -> {error, Reason} + end. is_tcp_server_available(Host, Port) -> case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 1865f8f49..a5d453287 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -9,7 +9,7 @@ {emqx, {path, "../emqx"}}, {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, - {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.1"}}}, + {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}}, %% NOTE: mind poolboy version when updating mongodb-erlang version {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}}, %% NOTE: mind poolboy version when updating eredis_cluster version diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 0dcddcc68..90d1c207e 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). -export([roots/0, fields/1]). @@ -33,7 +34,7 @@ -export([connect/1]). -export([ query/3 - , prepared_query/4 + , prepared_query/3 ]). -export([do_health_check/1]). @@ -44,12 +45,17 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> + [{named_queries, fun named_queries/1}] ++ emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). on_jsonify(#{server := Server}= Config) -> Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}. +named_queries(type) -> map(); +named_queries(nullable) -> true; +named_queries(_) -> undefined. + %% =================================================================== on_start(InstId, #{server := {Host, Port}, database := DB, @@ -57,7 +63,7 @@ on_start(InstId, #{server := {Host, Port}, password := Password, auto_reconnect := AutoReconn, pool_size := PoolSize, - ssl := SSL } = Config) -> + ssl := SSL} = Config) -> ?SLOG(info, #{msg => "starting_postgresql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of @@ -74,7 +80,8 @@ on_start(InstId, #{server := {Host, Port}, {password, Password}, {database, DB}, {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize}], + {pool_size, PoolSize}, + {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), {ok, #{poolname => PoolName}}. @@ -84,20 +91,17 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> - {Command, Args} = case QueryParams of - {query, SQL} -> {query, [SQL, []]}; - {query, SQL, Params} -> {query, [SQL, Params]}; - {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; - {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} - end, - ?TRACE("QUERY", "postgresql_connector_received", - #{connector => InstId, command => Command, args => Args, state => State}), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of +on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State); + +on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> + ?SLOG(debug, #{msg => "postgresql connector received sql query", + connector => InstId, sql => NameOrSQL, state => State}), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of {error, Reason} -> ?SLOG(error, #{ - msg => "postgresql_connector_do_sql_query_failed", - connector => InstId, sql => SQL, reason => Reason}), + msg => "postgresql connector do sql query failed", + connector => InstId, sql => NameOrSQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) @@ -118,13 +122,32 @@ connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), Password = proplists:get_value(password, Opts), - epgsql:connect(Host, Username, Password, conn_opts(Opts)). + NamedQueries = proplists:get_value(named_queries, Opts), + case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of + {ok, Conn} -> + case parse(Conn, NamedQueries) of + ok -> {ok, Conn}; + {error, Reason} -> {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. query(Conn, SQL, Params) -> epgsql:equery(Conn, SQL, Params). -prepared_query(Conn, Name, SQL, Params) -> - epgsql:prepared_query2(Conn, Name, SQL, Params). +prepared_query(Conn, Name, Params) -> + epgsql:prepared_query2(Conn, Name, Params). + +parse(_Conn, []) -> + ok; +parse(Conn, [{Name, Query} | More]) -> + case epgsql:parse2(Conn, Name, Query, []) of + {ok, _Statement} -> + parse(Conn, More); + Other -> + Other + end. conn_opts(Opts) -> conn_opts(Opts, []). diff --git a/mix.exs b/mix.exs index ca66f0c6d..aa0aa9d31 100644 --- a/mix.exs +++ b/mix.exs @@ -73,7 +73,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by ehttpc and emqtt {:gun, github: "emqx/gun", tag: "1.3.6", override: true}, # in conflict by emqx_connectior and system_monitor - {:epgsql, github: "emqx/epgsql", tag: "4.7-emqx.1", override: true}, + {:epgsql, github: "emqx/epgsql", tag: "4.7-emqx.2", override: true}, # in conflict by mongodb and eredis_cluster {:poolboy, github: "emqx/poolboy", tag: "1.5.2", override: true}, # in conflict by emqx and observer_cli