fix: fix errors caused by concurrent parsing of sql

This commit is contained in:
zhouzb 2022-01-11 09:39:56 +08:00
parent 76695c9652
commit 2791052135
3 changed files with 57 additions and 31 deletions

View File

@ -80,7 +80,7 @@ create(#{query := Query0,
placeholders => PlaceHolders,
password_hash_algorithm => Algorithm,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config) of
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->
@ -101,12 +101,11 @@ update(Config, State) ->
authenticate(#{auth_method := _}, _) ->
ignore;
authenticate(#{password := Password} = Credential,
#{query := Query,
placeholders := PlaceHolders,
#{placeholders := PlaceHolders,
resource_id := ResourceId,
password_hash_algorithm := Algorithm}) ->
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of
case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Params}) of
{ok, _Columns, []} -> ignore;
{ok, Columns, [Row | _]} ->
NColumns = [Name || #column{name = Name} <- Columns],
@ -133,7 +132,6 @@ destroy(#{resource_id := ResourceId}) ->
%% Internal functions
%%------------------------------------------------------------------------------
%% TODO: Support prepare
parse_query(Query) ->
case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
{match, Captured} ->

View File

@ -39,12 +39,19 @@
description() ->
"AuthZ with Postgresql".
init(#{query := SQL} = Source) ->
case emqx_authz_utils:create_resource(emqx_connector_pgsql, Source) of
{error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> Source#{annotations =>
#{id => Id,
query => parse_query(SQL)}}
init(#{query := SQL0} = Source) ->
{SQL, PlaceHolders} = parse_query(SQL0),
ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
case emqx_resource:create_local(
ResourceID,
emqx_connector_pgsql,
Source#{named_queries => #{ResourceID => SQL}}) of
{ok, _} ->
Source#{annotations =>
#{id => ResourceID,
placeholders => PlaceHolders}};
{error, Reason} ->
error({load_config_error, Reason})
end.
destroy(#{annotations := #{id := Id}}) ->
@ -70,10 +77,10 @@ parse_query(Sql) ->
authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID,
query := {Query, Params}
placeholders := Placeholders
}
}) ->
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, replvar(Placeholders, Client)}) of
{ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("epgsql/include/epgsql.hrl").
-export([roots/0, fields/1]).
@ -33,7 +34,7 @@
-export([connect/1]).
-export([ query/3
, prepared_query/4
, prepared_query/3
]).
-export([do_health_check/1]).
@ -44,12 +45,17 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
[{named_queries, fun named_queries/1}] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(#{server := Server}= Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
named_queries(type) -> map();
named_queries(nullable) -> true;
named_queries(_) -> undefined.
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
@ -57,7 +63,7 @@ on_start(InstId, #{server := {Host, Port},
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL } = Config) ->
ssl := SSL} = Config) ->
?SLOG(info, #{msg => "starting_postgresql_connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of
@ -74,7 +80,8 @@ on_start(InstId, #{server := {Host, Port},
{password, Password},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize}],
{pool_size, PoolSize},
{named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
{ok, #{poolname => PoolName}}.
@ -84,20 +91,17 @@ on_stop(InstId, #{poolname := PoolName}) ->
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
{Command, Args} = case QueryParams of
{query, SQL} -> {query, [SQL, []]};
{query, SQL, Params} -> {query, [SQL, Params]};
{prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
{prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
end,
?TRACE("QUERY", "postgresql_connector_received",
#{connector => InstId, command => Command, args => Args, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State);
on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
?SLOG(debug, #{msg => "postgresql connector received sql query",
connector => InstId, sql => NameOrSQL, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of
{error, Reason} ->
?SLOG(error, #{
msg => "postgresql_connector_do_sql_query_failed",
connector => InstId, sql => SQL, reason => Reason}),
msg => "postgresql connector do sql query failed",
connector => InstId, sql => NameOrSQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->
emqx_resource:query_success(AfterQuery)
@ -118,13 +122,30 @@ connect(Opts) ->
Host = proplists:get_value(host, Opts),
Username = proplists:get_value(username, Opts),
Password = proplists:get_value(password, Opts),
epgsql:connect(Host, Username, Password, conn_opts(Opts)).
NamedQueries = proplists:get_value(named_queries, Opts),
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
{ok, Conn} ->
parse(Conn, NamedQueries),
{ok, Conn};
{error, Reason} ->
{error, Reason}
end.
query(Conn, SQL, Params) ->
epgsql:equery(Conn, SQL, Params).
prepared_query(Conn, Name, SQL, Params) ->
epgsql:prepared_query2(Conn, Name, SQL, Params).
prepared_query(Conn, Name, Params) ->
epgsql:prepared_query2(Conn, Name, Params).
parse(_Conn, []) ->
ok;
parse(Conn, [{Name, Query} | More]) ->
case epgsql:parse2(Conn, Name, Query, []) of
{ok, _Statement} ->
parse(Conn, More);
Other ->
Other
end.
conn_opts(Opts) ->
conn_opts(Opts, []).