113 lines
4.2 KiB
Erlang
113 lines
4.2 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_connector_mysql).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-behaviour(emqx_resource).
|
|
|
|
%% callbacks of behaviour emqx_resource
|
|
-export([ on_start/2
|
|
, on_stop/2
|
|
, on_query/4
|
|
, on_health_check/2
|
|
, on_jsonify/1
|
|
]).
|
|
|
|
-export([connect/1]).
|
|
|
|
-export([roots/0, fields/1]).
|
|
|
|
-export([do_health_check/1]).
|
|
|
|
%%=====================================================================
|
|
%% Hocon schema
|
|
roots() ->
|
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
|
|
|
fields(config) ->
|
|
emqx_connector_schema_lib:relational_db_fields() ++
|
|
emqx_connector_schema_lib:ssl_fields().
|
|
|
|
%%=====================================================================
|
|
|
|
on_jsonify(#{server := Server}= Config) ->
|
|
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
|
|
|
|
%% ===================================================================
|
|
on_start(InstId, #{server := {Host, Port},
|
|
database := DB,
|
|
username := User,
|
|
password := Password,
|
|
auto_reconnect := AutoReconn,
|
|
pool_size := PoolSize,
|
|
ssl := SSL } = Config) ->
|
|
?SLOG(info, #{msg => "starting_mysql_connector",
|
|
connector => InstId, config => Config}),
|
|
SslOpts = case maps:get(enable, SSL) of
|
|
true ->
|
|
[{ssl, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}];
|
|
false -> []
|
|
end,
|
|
Options = [{host, Host},
|
|
{port, Port},
|
|
{user, User},
|
|
{password, Password},
|
|
{database, DB},
|
|
{auto_reconnect, reconn_interval(AutoReconn)},
|
|
{pool_size, PoolSize}],
|
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
|
|
{ok, #{poolname => PoolName}}.
|
|
|
|
on_stop(InstId, #{poolname := PoolName}) ->
|
|
?SLOG(info, #{msg => "stopping_mysql_connector",
|
|
connector => InstId}),
|
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
|
|
|
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
|
on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
|
|
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
|
|
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
|
|
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
|
|
?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
|
|
case Result = ecpool:pick_and_do(
|
|
PoolName,
|
|
{mysql, query, [SQL, Params, Timeout]},
|
|
no_handover) of
|
|
{error, Reason} ->
|
|
?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
|
|
connector => InstId, sql => SQL, reason => Reason}),
|
|
emqx_resource:query_failed(AfterQuery);
|
|
_ ->
|
|
emqx_resource:query_success(AfterQuery)
|
|
end,
|
|
Result.
|
|
|
|
on_health_check(_InstId, #{poolname := PoolName} = State) ->
|
|
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
|
|
|
|
do_health_check(Conn) ->
|
|
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
|
|
|
|
%% ===================================================================
|
|
reconn_interval(true) -> 15;
|
|
reconn_interval(false) -> false.
|
|
|
|
connect(Options) ->
|
|
mysql:start_link(Options).
|