fix: mysql support prepare sql

This commit is contained in:
DDDHuang 2022-04-19 16:24:46 +08:00
parent 597bdcd4aa
commit 0faf1240f3
1 changed files with 69 additions and 10 deletions

View File

@ -29,7 +29,10 @@
, on_health_check/2
]).
-export([connect/1]).
%% ecpool connect & reconnect
-export([connect/1, prepare_sql_to_conn/1]).
-export([prepare_sql/2]).
-export([roots/0, fields/1]).
@ -91,25 +94,35 @@ on_stop(InstId, #{poolname := PoolName}) ->
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State);
on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State);
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta),
case Result = ecpool:pick_and_do(
PoolName,
{mysql, query, [SQL, Params, Timeout]},
{mysql, mysql_function(Type), [SQLOrKey, Params, Timeout]},
no_handover) of
{error, disconnected} ->
?SLOG(error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}),
%% kill the poll worker to trigger reconnection
_ = exit(Conn, restart),
emqx_resource:query_failed(AfterQuery);
{error, Reason} ->
?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
connector => InstId, sql => SQL, reason => Reason}),
?SLOG(error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->
emqx_resource:query_success(AfterQuery)
end,
Result.
mysql_function(sql) -> query;
mysql_function(prepared_query) -> execute.
on_health_check(_InstId, #{poolname := PoolName} = State) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
@ -127,3 +140,49 @@ connect(Options) ->
-> {inet:ip_address() | inet:hostname(), pos_integer()}.
to_server(Str) ->
emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
prepare_sql(Prepares, PoolName) ->
case do_prepare_sql(Prepares, PoolName) of
ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
ok;
{error, R} ->
{error, R}
end.
do_prepare_sql([{PrepareSqlKey, PrepareStatement} | Prepares], PoolName) ->
Workers =
[begin
{ok, Conn} = ecpool_worker:client(Worker),
Conn
end || Worker <- ecpool:workers(PoolName)],
prepare_sql_to_conn_list(Workers, [{PrepareSqlKey, PrepareStatement}]).
prepare_sql_to_conn_list([], PrepareList) -> ok;
prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
case prepare_sql_to_conn(Conn, PrepareList) of
ok ->
prepare_sql_to_conn(ConnList, PrepareList);
{error, R} ->
%% rollback
[unprepare_sql_to_conn(Conn, Key) || {Key, _} <- PrepareList],
{error, R}
end.
prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok;
prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) ->
LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey},
?SLOG(info, LogMeta#{prepare => PrepareStatement}),
_ = mysql:unprepare(Conn, PrepareSqlKey),
case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of
{ok, Name} ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList);
{error, Reason} ->
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, Reason};
end.
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
mysql:unprepare(Conn, PrepareSqlKey).