Merge pull request #6712 from tigercl/fix/pgsql
fix: fix errors caused by concurrent parsing of sql
This commit is contained in:
commit
6572785907
|
@ -76,11 +76,10 @@ create(#{query := Query0,
|
||||||
ok = emqx_authn_password_hashing:init(Algorithm),
|
ok = emqx_authn_password_hashing:init(Algorithm),
|
||||||
{Query, PlaceHolders} = parse_query(Query0),
|
{Query, PlaceHolders} = parse_query(Query0),
|
||||||
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
||||||
State = #{query => Query,
|
State = #{placeholders => PlaceHolders,
|
||||||
placeholders => PlaceHolders,
|
|
||||||
password_hash_algorithm => Algorithm,
|
password_hash_algorithm => Algorithm,
|
||||||
resource_id => ResourceId},
|
resource_id => ResourceId},
|
||||||
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config) of
|
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of
|
||||||
{ok, already_created} ->
|
{ok, already_created} ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
|
@ -101,12 +100,11 @@ update(Config, State) ->
|
||||||
authenticate(#{auth_method := _}, _) ->
|
authenticate(#{auth_method := _}, _) ->
|
||||||
ignore;
|
ignore;
|
||||||
authenticate(#{password := Password} = Credential,
|
authenticate(#{password := Password} = Credential,
|
||||||
#{query := Query,
|
#{placeholders := PlaceHolders,
|
||||||
placeholders := PlaceHolders,
|
|
||||||
resource_id := ResourceId,
|
resource_id := ResourceId,
|
||||||
password_hash_algorithm := Algorithm}) ->
|
password_hash_algorithm := Algorithm}) ->
|
||||||
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
|
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
|
||||||
case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of
|
case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Params}) of
|
||||||
{ok, _Columns, []} -> ignore;
|
{ok, _Columns, []} -> ignore;
|
||||||
{ok, Columns, [Row | _]} ->
|
{ok, Columns, [Row | _]} ->
|
||||||
NColumns = [Name || #column{name = Name} <- Columns],
|
NColumns = [Name || #column{name = Name} <- Columns],
|
||||||
|
@ -133,7 +131,6 @@ destroy(#{resource_id := ResourceId}) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% TODO: Support prepare
|
|
||||||
parse_query(Query) ->
|
parse_query(Query) ->
|
||||||
case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
|
case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
|
||||||
{match, Captured} ->
|
{match, Captured} ->
|
||||||
|
|
|
@ -31,10 +31,10 @@
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[{group, require_seeds}, t_create, t_create_invalid, t_parse_query].
|
[{group, require_seeds}, t_create_invalid, t_parse_query].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[{require_seeds, [], [t_authenticate, t_update, t_destroy, t_is_superuser]}].
|
[{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}].
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
|
@ -171,7 +171,7 @@ t_update(_Config) ->
|
||||||
IncorrectConfig =
|
IncorrectConfig =
|
||||||
CorrectConfig#{
|
CorrectConfig#{
|
||||||
query => <<"SELECT password_hash, salt, is_superuser_str as is_superuser
|
query => <<"SELECT password_hash, salt, is_superuser_str as is_superuser
|
||||||
FROM wrong_table where username = ${username} LIMIT 1">>},
|
FROM users where username = ${username} LIMIT 0">>},
|
||||||
|
|
||||||
{ok, _} = emqx:update_config(
|
{ok, _} = emqx:update_config(
|
||||||
?PATH,
|
?PATH,
|
||||||
|
|
|
@ -125,8 +125,7 @@ raw_pgsql_auth_config(SpecificSSLOpts) ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
|
|
||||||
query => <<"SELECT password_hash, salt, is_superuser_str as is_superuser
|
query => <<"SELECT 1">>,
|
||||||
FROM users where username = ${username} LIMIT 1">>,
|
|
||||||
server => pgsql_server(),
|
server => pgsql_server(),
|
||||||
ssl => maps:merge(SSLOpts, SpecificSSLOpts)
|
ssl => maps:merge(SSLOpts, SpecificSSLOpts)
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -39,12 +39,19 @@
|
||||||
description() ->
|
description() ->
|
||||||
"AuthZ with Postgresql".
|
"AuthZ with Postgresql".
|
||||||
|
|
||||||
init(#{query := SQL} = Source) ->
|
init(#{query := SQL0} = Source) ->
|
||||||
case emqx_authz_utils:create_resource(emqx_connector_pgsql, Source) of
|
{SQL, PlaceHolders} = parse_query(SQL0),
|
||||||
{error, Reason} -> error({load_config_error, Reason});
|
ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
|
||||||
{ok, Id} -> Source#{annotations =>
|
case emqx_resource:create_local(
|
||||||
#{id => Id,
|
ResourceID,
|
||||||
query => parse_query(SQL)}}
|
emqx_connector_pgsql,
|
||||||
|
Source#{named_queries => #{ResourceID => SQL}}) of
|
||||||
|
{ok, _} ->
|
||||||
|
Source#{annotations =>
|
||||||
|
#{id => ResourceID,
|
||||||
|
placeholders => PlaceHolders}};
|
||||||
|
{error, Reason} ->
|
||||||
|
error({load_config_error, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
destroy(#{annotations := #{id := Id}}) ->
|
destroy(#{annotations := #{id := Id}}) ->
|
||||||
|
@ -70,10 +77,10 @@ parse_query(Sql) ->
|
||||||
|
|
||||||
authorize(Client, PubSub, Topic,
|
authorize(Client, PubSub, Topic,
|
||||||
#{annotations := #{id := ResourceID,
|
#{annotations := #{id := ResourceID,
|
||||||
query := {Query, Params}
|
placeholders := Placeholders
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of
|
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, replvar(Placeholders, Client)}) of
|
||||||
{ok, _Columns, []} -> nomatch;
|
{ok, _Columns, []} -> nomatch;
|
||||||
{ok, Columns, Rows} ->
|
{ok, Columns, Rows} ->
|
||||||
do_authorize(Client, PubSub, Topic, Columns, Rows);
|
do_authorize(Client, PubSub, Topic, Columns, Rows);
|
||||||
|
|
|
@ -168,7 +168,8 @@ t_pgsql_error(_Config) ->
|
||||||
},
|
},
|
||||||
|
|
||||||
ok = setup_config(
|
ok = setup_config(
|
||||||
#{<<"query">> => <<"SOME INVALID STATEMENT">>}),
|
#{<<"query">> => <<"SELECT permission, action, topic "
|
||||||
|
"FROM acl WHERE clientid = ${username}">>}),
|
||||||
|
|
||||||
ok = emqx_authz_test_lib:test_samples(
|
ok = emqx_authz_test_lib:test_samples(
|
||||||
ClientInfo,
|
ClientInfo,
|
||||||
|
|
|
@ -40,8 +40,10 @@ reset_authorizers(Nomatch, ChacheEnabled) ->
|
||||||
|
|
||||||
setup_config(BaseConfig, SpecialParams) ->
|
setup_config(BaseConfig, SpecialParams) ->
|
||||||
Config = maps:merge(BaseConfig, SpecialParams),
|
Config = maps:merge(BaseConfig, SpecialParams),
|
||||||
{ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
|
case emqx_authz:update(?CMD_REPLACE, [Config]) of
|
||||||
ok.
|
{ok, _} -> ok;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
is_tcp_server_available(Host, Port) ->
|
is_tcp_server_available(Host, Port) ->
|
||||||
case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
|
case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
{emqx, {path, "../emqx"}},
|
{emqx, {path, "../emqx"}},
|
||||||
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
|
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
|
||||||
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
|
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
|
||||||
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.1"}}},
|
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}},
|
||||||
%% NOTE: mind poolboy version when updating mongodb-erlang version
|
%% NOTE: mind poolboy version when updating mongodb-erlang version
|
||||||
{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}},
|
{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}},
|
||||||
%% NOTE: mind poolboy version when updating eredis_cluster version
|
%% NOTE: mind poolboy version when updating eredis_cluster version
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("epgsql/include/epgsql.hrl").
|
||||||
|
|
||||||
-export([roots/0, fields/1]).
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
@ -33,7 +34,7 @@
|
||||||
-export([connect/1]).
|
-export([connect/1]).
|
||||||
|
|
||||||
-export([ query/3
|
-export([ query/3
|
||||||
, prepared_query/4
|
, prepared_query/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([do_health_check/1]).
|
-export([do_health_check/1]).
|
||||||
|
@ -44,12 +45,17 @@ roots() ->
|
||||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||||
|
|
||||||
fields(config) ->
|
fields(config) ->
|
||||||
|
[{named_queries, fun named_queries/1}] ++
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
on_jsonify(#{server := Server}= Config) ->
|
on_jsonify(#{server := Server}= Config) ->
|
||||||
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
|
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
|
||||||
|
|
||||||
|
named_queries(type) -> map();
|
||||||
|
named_queries(nullable) -> true;
|
||||||
|
named_queries(_) -> undefined.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{server := {Host, Port},
|
on_start(InstId, #{server := {Host, Port},
|
||||||
database := DB,
|
database := DB,
|
||||||
|
@ -57,7 +63,7 @@ on_start(InstId, #{server := {Host, Port},
|
||||||
password := Password,
|
password := Password,
|
||||||
auto_reconnect := AutoReconn,
|
auto_reconnect := AutoReconn,
|
||||||
pool_size := PoolSize,
|
pool_size := PoolSize,
|
||||||
ssl := SSL } = Config) ->
|
ssl := SSL} = Config) ->
|
||||||
?SLOG(info, #{msg => "starting_postgresql_connector",
|
?SLOG(info, #{msg => "starting_postgresql_connector",
|
||||||
connector => InstId, config => Config}),
|
connector => InstId, config => Config}),
|
||||||
SslOpts = case maps:get(enable, SSL) of
|
SslOpts = case maps:get(enable, SSL) of
|
||||||
|
@ -74,7 +80,8 @@ on_start(InstId, #{server := {Host, Port},
|
||||||
{password, Password},
|
{password, Password},
|
||||||
{database, DB},
|
{database, DB},
|
||||||
{auto_reconnect, reconn_interval(AutoReconn)},
|
{auto_reconnect, reconn_interval(AutoReconn)},
|
||||||
{pool_size, PoolSize}],
|
{pool_size, PoolSize},
|
||||||
|
{named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}],
|
||||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
|
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
|
||||||
{ok, #{poolname => PoolName}}.
|
{ok, #{poolname => PoolName}}.
|
||||||
|
@ -84,20 +91,17 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
connector => InstId}),
|
connector => InstId}),
|
||||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||||
|
|
||||||
on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
|
on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||||
{Command, Args} = case QueryParams of
|
on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State);
|
||||||
{query, SQL} -> {query, [SQL, []]};
|
|
||||||
{query, SQL, Params} -> {query, [SQL, Params]};
|
on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||||
{prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
|
?SLOG(debug, #{msg => "postgresql connector received sql query",
|
||||||
{prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
|
connector => InstId, sql => NameOrSQL, state => State}),
|
||||||
end,
|
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of
|
||||||
?TRACE("QUERY", "postgresql_connector_received",
|
|
||||||
#{connector => InstId, command => Command, args => Args, state => State}),
|
|
||||||
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "postgresql_connector_do_sql_query_failed",
|
msg => "postgresql connector do sql query failed",
|
||||||
connector => InstId, sql => SQL, reason => Reason}),
|
connector => InstId, sql => NameOrSQL, reason => Reason}),
|
||||||
emqx_resource:query_failed(AfterQuery);
|
emqx_resource:query_failed(AfterQuery);
|
||||||
_ ->
|
_ ->
|
||||||
emqx_resource:query_success(AfterQuery)
|
emqx_resource:query_success(AfterQuery)
|
||||||
|
@ -118,13 +122,32 @@ connect(Opts) ->
|
||||||
Host = proplists:get_value(host, Opts),
|
Host = proplists:get_value(host, Opts),
|
||||||
Username = proplists:get_value(username, Opts),
|
Username = proplists:get_value(username, Opts),
|
||||||
Password = proplists:get_value(password, Opts),
|
Password = proplists:get_value(password, Opts),
|
||||||
epgsql:connect(Host, Username, Password, conn_opts(Opts)).
|
NamedQueries = proplists:get_value(named_queries, Opts),
|
||||||
|
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
|
||||||
|
{ok, Conn} ->
|
||||||
|
case parse(Conn, NamedQueries) of
|
||||||
|
ok -> {ok, Conn};
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
query(Conn, SQL, Params) ->
|
query(Conn, SQL, Params) ->
|
||||||
epgsql:equery(Conn, SQL, Params).
|
epgsql:equery(Conn, SQL, Params).
|
||||||
|
|
||||||
prepared_query(Conn, Name, SQL, Params) ->
|
prepared_query(Conn, Name, Params) ->
|
||||||
epgsql:prepared_query2(Conn, Name, SQL, Params).
|
epgsql:prepared_query2(Conn, Name, Params).
|
||||||
|
|
||||||
|
parse(_Conn, []) ->
|
||||||
|
ok;
|
||||||
|
parse(Conn, [{Name, Query} | More]) ->
|
||||||
|
case epgsql:parse2(Conn, Name, Query, []) of
|
||||||
|
{ok, _Statement} ->
|
||||||
|
parse(Conn, More);
|
||||||
|
Other ->
|
||||||
|
Other
|
||||||
|
end.
|
||||||
|
|
||||||
conn_opts(Opts) ->
|
conn_opts(Opts) ->
|
||||||
conn_opts(Opts, []).
|
conn_opts(Opts, []).
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -73,7 +73,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
# in conflict by ehttpc and emqtt
|
# in conflict by ehttpc and emqtt
|
||||||
{:gun, github: "emqx/gun", tag: "1.3.6", override: true},
|
{:gun, github: "emqx/gun", tag: "1.3.6", override: true},
|
||||||
# in conflict by emqx_connectior and system_monitor
|
# in conflict by emqx_connectior and system_monitor
|
||||||
{:epgsql, github: "emqx/epgsql", tag: "4.7-emqx.1", override: true},
|
{:epgsql, github: "emqx/epgsql", tag: "4.7-emqx.2", override: true},
|
||||||
# in conflict by mongodb and eredis_cluster
|
# in conflict by mongodb and eredis_cluster
|
||||||
{:poolboy, github: "emqx/poolboy", tag: "1.5.2", override: true},
|
{:poolboy, github: "emqx/poolboy", tag: "1.5.2", override: true},
|
||||||
# in conflict by emqx and observer_cli
|
# in conflict by emqx and observer_cli
|
||||||
|
|
Loading…
Reference in New Issue