feat(connector): mysql and pgsql query support params

This commit is contained in:
zhanghongtong 2021-06-17 18:05:43 +08:00 committed by Rory Z
parent d640e2ccfa
commit d9d5bc4fae
2 changed files with 9 additions and 5 deletions

View File

@ -71,8 +71,10 @@ on_stop(InstId, #{poolname := PoolName}) ->
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);

View File

@ -31,7 +31,7 @@
-export([connect/1]). -export([connect/1]).
-export([query/2]). -export([query/3]).
-export([do_health_check/1]). -export([do_health_check/1]).
@ -74,8 +74,10 @@ on_stop(InstId, #{poolname := PoolName}) ->
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
@ -100,8 +102,8 @@ connect(Opts) ->
Password = proplists:get_value(password, Opts), Password = proplists:get_value(password, Opts),
epgsql:connect(Host, Username, Password, conn_opts(Opts)). epgsql:connect(Host, Username, Password, conn_opts(Opts)).
query(Conn, SQL) -> query(Conn, SQL, Params) ->
epgsql:squery(Conn, SQL). epgsql:equery(Conn, SQL, Params).
conn_opts(Opts) -> conn_opts(Opts) ->
conn_opts(Opts, []). conn_opts(Opts, []).