emqx/apps/emqx_connector/src/emqx_connector_mysql.erl

113 lines
4.2 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mysql).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
-export([connect/1]).
-export([roots/0, fields/1]).
-export([do_health_check/1]).
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
%%=====================================================================
on_jsonify(#{server := Server}= Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
username := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL } = Config) ->
?SLOG(info, #{msg => "starting_mysql_connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of
true ->
[{ssl, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}];
false -> []
end,
Options = [{host, Host},
{port, Port},
{user, User},
{password, Password},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
{ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping_mysql_connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
case Result = ecpool:pick_and_do(
PoolName,
{mysql, query, [SQL, Params, Timeout]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
connector => InstId, sql => SQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->
emqx_resource:query_success(AfterQuery)
end,
Result.
on_health_check(_InstId, #{poolname := PoolName} = State) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
do_health_check(Conn) ->
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
%% ===================================================================
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Options) ->
mysql:start_link(Options).