From d9d5bc4fae37ce1f2e81981276aa36f87cc909be Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 17 Jun 2021 18:05:43 +0800 Subject: [PATCH] feat(connector): mysql and pgsql query support params --- .../src/connector/emqx_connector_mysql.erl | 4 +++- .../src/connector/emqx_connector_pgsql.erl | 10 ++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx_data_bridge/src/connector/emqx_connector_mysql.erl b/apps/emqx_data_bridge/src/connector/emqx_connector_mysql.erl index e90d5f171..61e13901e 100644 --- a/apps/emqx_data_bridge/src/connector/emqx_connector_mysql.erl +++ b/apps/emqx_data_bridge/src/connector/emqx_connector_mysql.erl @@ -71,8 +71,10 @@ on_stop(InstId, #{poolname := PoolName}) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> + on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); +on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), - case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of + case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of {error, Reason} -> logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), emqx_resource:query_failed(AfterQuery); diff --git a/apps/emqx_data_bridge/src/connector/emqx_connector_pgsql.erl b/apps/emqx_data_bridge/src/connector/emqx_connector_pgsql.erl index 329138ab1..0ff0d6779 100644 --- a/apps/emqx_data_bridge/src/connector/emqx_connector_pgsql.erl +++ b/apps/emqx_data_bridge/src/connector/emqx_connector_pgsql.erl @@ -31,7 +31,7 @@ -export([connect/1]). --export([query/2]). +-export([query/3]). -export([do_health_check/1]). @@ -74,8 +74,10 @@ on_stop(InstId, #{poolname := PoolName}) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> + on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); +on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL]}, no_handover) of + case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of {error, Reason} -> logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), emqx_resource:query_failed(AfterQuery); @@ -100,8 +102,8 @@ connect(Opts) -> Password = proplists:get_value(password, Opts), epgsql:connect(Host, Username, Password, conn_opts(Opts)). -query(Conn, SQL) -> - epgsql:squery(Conn, SQL). +query(Conn, SQL, Params) -> + epgsql:equery(Conn, SQL, Params). conn_opts(Opts) -> conn_opts(Opts, []).