584 lines
19 KiB
Erlang
584 lines
19 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_mysql).
|
|
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-behaviour(emqx_resource).
|
|
|
|
%% callbacks of behaviour emqx_resource
|
|
-export([
|
|
resource_type/0,
|
|
callback_mode/0,
|
|
on_start/2,
|
|
on_stop/2,
|
|
on_query/3,
|
|
on_batch_query/3,
|
|
on_get_status/2,
|
|
on_format_query_result/1
|
|
]).
|
|
|
|
%% ecpool connect & reconnect
|
|
-export([connect/1, prepare_sql_to_conn/2]).
|
|
|
|
-export([
|
|
init_prepare/1,
|
|
prepare_sql/2,
|
|
parse_prepare_sql/1,
|
|
parse_prepare_sql/2,
|
|
unprepare_sql/1
|
|
]).
|
|
|
|
-export([roots/0, fields/1, namespace/0]).
|
|
|
|
-export([do_get_status/1]).
|
|
|
|
-define(MYSQL_HOST_OPTIONS, #{
|
|
default_port => ?MYSQL_DEFAULT_PORT
|
|
}).
|
|
|
|
-type template() :: {unicode:chardata(), emqx_template:str()}.
|
|
-type state() ::
|
|
#{
|
|
pool_name := binary(),
|
|
prepares := ok | {error, _},
|
|
templates := #{{atom(), batch | prepstmt} => template()},
|
|
query_templates := map()
|
|
}.
|
|
-export_type([state/0]).
|
|
%%=====================================================================
|
|
%% Hocon schema
|
|
|
|
namespace() -> mysql.
|
|
|
|
roots() ->
|
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
|
|
|
fields(config) ->
|
|
[{server, server()}] ++
|
|
add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
|
|
emqx_connector_schema_lib:ssl_fields().
|
|
|
|
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
|
|
Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
|
|
add_default_username([Field | Tail], Head) ->
|
|
add_default_username(Tail, Head ++ [Field]).
|
|
|
|
add_default_fn(OrigFn, Default) ->
|
|
fun
|
|
(default) -> Default;
|
|
(Field) -> OrigFn(Field)
|
|
end.
|
|
|
|
server() ->
|
|
Meta = #{desc => ?DESC("server")},
|
|
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
|
|
|
|
%% ===================================================================
|
|
resource_type() -> mysql.
|
|
|
|
callback_mode() -> always_sync.
|
|
|
|
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
|
|
on_start(
|
|
InstId,
|
|
#{
|
|
server := Server,
|
|
database := DB,
|
|
username := Username,
|
|
pool_size := PoolSize,
|
|
ssl := SSL
|
|
} = Config
|
|
) ->
|
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
|
|
?SLOG(info, #{
|
|
msg => "starting_mysql_connector",
|
|
connector => InstId,
|
|
config => emqx_utils:redact(Config)
|
|
}),
|
|
SslOpts =
|
|
case maps:get(enable, SSL) of
|
|
true ->
|
|
[{ssl, emqx_tls_lib:to_client_opts(SSL)}];
|
|
false ->
|
|
[]
|
|
end,
|
|
Options =
|
|
maybe_add_password_opt(
|
|
maps:get(password, Config, undefined),
|
|
[
|
|
{host, Host},
|
|
{port, Port},
|
|
{user, Username},
|
|
{database, DB},
|
|
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
|
{pool_size, PoolSize}
|
|
]
|
|
),
|
|
State = parse_prepare_sql(Config),
|
|
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
|
|
ok ->
|
|
{ok, init_prepare(State#{pool_name => InstId})};
|
|
{error, Reason} ->
|
|
?tp(
|
|
mysql_connector_start_failed,
|
|
#{error => Reason}
|
|
),
|
|
{error, Reason}
|
|
end.
|
|
|
|
maybe_add_password_opt(undefined, Options) ->
|
|
Options;
|
|
maybe_add_password_opt(Password, Options) ->
|
|
[{password, Password} | Options].
|
|
|
|
on_stop(InstId, _State) ->
|
|
?SLOG(info, #{
|
|
msg => "stopping_mysql_connector",
|
|
connector => InstId
|
|
}),
|
|
emqx_resource_pool:stop(InstId).
|
|
|
|
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
|
|
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
|
|
on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
|
|
on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
|
|
on_query(
|
|
InstId,
|
|
{TypeOrKey, SQLOrKey, Params, Timeout},
|
|
State
|
|
) ->
|
|
MySqlFunction = mysql_function(TypeOrKey),
|
|
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
|
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
|
|
{error, not_prepared} ->
|
|
case maybe_prepare_sql(SQLOrKey2, State) of
|
|
ok ->
|
|
?tp(
|
|
mysql_connector_on_query_prepared_sql,
|
|
#{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
|
|
),
|
|
%% not return result, next loop will try again
|
|
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
|
|
{error, Reason} ->
|
|
?tp(
|
|
error,
|
|
"mysql_connector_do_prepare_failed",
|
|
#{
|
|
connector => InstId,
|
|
sql => SQLOrKey,
|
|
state => State,
|
|
reason => Reason
|
|
}
|
|
),
|
|
{error, Reason}
|
|
end;
|
|
Result ->
|
|
Result
|
|
end.
|
|
|
|
on_batch_query(
|
|
InstId,
|
|
BatchReq = [{Key, _} | _],
|
|
#{query_templates := Templates} = State
|
|
) ->
|
|
case maps:get({Key, batch}, Templates, undefined) of
|
|
undefined ->
|
|
{error, {unrecoverable_error, batch_select_not_implemented}};
|
|
Template ->
|
|
on_batch_insert(InstId, BatchReq, Template, State)
|
|
end;
|
|
on_batch_query(
|
|
InstId,
|
|
BatchReq,
|
|
State
|
|
) ->
|
|
?SLOG(error, #{
|
|
msg => "invalid request",
|
|
connector => InstId,
|
|
request => BatchReq,
|
|
state => State
|
|
}),
|
|
{error, {unrecoverable_error, invalid_request}}.
|
|
|
|
on_format_query_result({ok, ColumnNames, Rows}) ->
|
|
#{result => ok, column_names => ColumnNames, rows => Rows};
|
|
on_format_query_result({ok, DataList}) ->
|
|
#{result => ok, column_names_rows_list => DataList};
|
|
on_format_query_result(Result) ->
|
|
Result.
|
|
|
|
mysql_function(sql) ->
|
|
query;
|
|
mysql_function(prepared_query) ->
|
|
execute;
|
|
%% for bridge
|
|
mysql_function(_) ->
|
|
mysql_function(prepared_query).
|
|
|
|
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
|
|
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
|
|
true ->
|
|
case do_check_prepares(State) of
|
|
ok ->
|
|
connected;
|
|
{ok, NState} ->
|
|
%% return new state with prepared statements
|
|
{connected, NState};
|
|
{error, undefined_table} ->
|
|
{disconnected, State, unhealthy_target};
|
|
{error, _Reason} ->
|
|
%% do not log error, it is logged in prepare_sql_to_conn
|
|
connecting
|
|
end;
|
|
false ->
|
|
connecting
|
|
end.
|
|
|
|
do_get_status(Conn) ->
|
|
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
|
|
|
|
do_check_prepares(
|
|
#{
|
|
pool_name := PoolName,
|
|
templates := #{{send_message, prepstmt} := SQL}
|
|
}
|
|
) ->
|
|
% it's already connected. Verify if target table still exists
|
|
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
lists:foldl(
|
|
fun
|
|
(WorkerPid, ok) ->
|
|
case ecpool_worker:client(WorkerPid) of
|
|
{ok, Conn} ->
|
|
case mysql:prepare(Conn, get_status, SQL) of
|
|
{error, {1146, _, _}} ->
|
|
{error, undefined_table};
|
|
{ok, Statement} ->
|
|
mysql:unprepare(Conn, Statement);
|
|
_ ->
|
|
ok
|
|
end;
|
|
_ ->
|
|
ok
|
|
end;
|
|
(_, Acc) ->
|
|
Acc
|
|
end,
|
|
ok,
|
|
Workers
|
|
);
|
|
do_check_prepares(#{prepares := ok}) ->
|
|
ok;
|
|
do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) ->
|
|
%% retry to prepare
|
|
case prepare_sql(State) of
|
|
ok ->
|
|
%% remove the error
|
|
{ok, State#{prepares => ok}};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end;
|
|
do_check_prepares(_NoTemplates) ->
|
|
ok.
|
|
|
|
%% ===================================================================
|
|
|
|
connect(Options) ->
|
|
%% TODO: teach `tdengine` to accept 0-arity closures as passwords.
|
|
NOptions = init_connect_opts(Options),
|
|
mysql:start_link(NOptions).
|
|
|
|
init_connect_opts(Options) ->
|
|
case lists:keytake(password, 1, Options) of
|
|
{value, {password, Secret}, Rest} ->
|
|
[{password, emqx_secret:unwrap(Secret)} | Rest];
|
|
false ->
|
|
Options
|
|
end.
|
|
|
|
init_prepare(State = #{query_templates := Templates}) ->
|
|
case maps:size(Templates) of
|
|
0 ->
|
|
State#{prepares => ok};
|
|
_ ->
|
|
case prepare_sql(State) of
|
|
ok ->
|
|
State#{prepares => ok};
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => <<"MySQL init prepare statement failed">>,
|
|
reason => Reason
|
|
}),
|
|
%% mark the prepare_statement as failed
|
|
State#{prepares => {error, Reason}}
|
|
end
|
|
end.
|
|
|
|
maybe_prepare_sql(SQLOrKey, State = #{query_templates := Templates}) ->
|
|
case maps:is_key({SQLOrKey, prepstmt}, Templates) of
|
|
true -> prepare_sql(State);
|
|
false -> {error, {unrecoverable_error, prepared_statement_invalid}}
|
|
end.
|
|
|
|
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
|
|
prepare_sql(maps:to_list(Templates), PoolName).
|
|
|
|
prepare_sql(Templates, PoolName) ->
|
|
case do_prepare_sql(Templates, PoolName) of
|
|
ok ->
|
|
%% prepare for reconnect
|
|
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
|
|
ok;
|
|
{error, R} ->
|
|
{error, R}
|
|
end.
|
|
|
|
do_prepare_sql(Templates, PoolName) ->
|
|
Conns = get_connections_from_pool(PoolName),
|
|
prepare_sql_to_conn_list(Conns, Templates).
|
|
|
|
get_connections_from_pool(PoolName) ->
|
|
[
|
|
begin
|
|
{ok, Conn} = ecpool_worker:client(Worker),
|
|
Conn
|
|
end
|
|
|| {_Name, Worker} <- ecpool:workers(PoolName)
|
|
].
|
|
|
|
prepare_sql_to_conn_list([], _Templates) ->
|
|
ok;
|
|
prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
|
|
case prepare_sql_to_conn(Conn, Templates) of
|
|
ok ->
|
|
prepare_sql_to_conn_list(ConnList, Templates);
|
|
{error, R} ->
|
|
%% rollback
|
|
_ = [unprepare_sql_to_conn(Conn, Template) || Template <- Templates],
|
|
{error, R}
|
|
end.
|
|
|
|
prepare_sql_to_conn(_Conn, []) ->
|
|
ok;
|
|
prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
|
|
LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL},
|
|
?SLOG(info, LogMeta),
|
|
_ = unprepare_sql_to_conn(Conn, Key),
|
|
case mysql:prepare(Conn, Key, SQL) of
|
|
{ok, _Key} ->
|
|
?SLOG(info, LogMeta#{result => success}),
|
|
prepare_sql_to_conn(Conn, Rest);
|
|
{error, {1146, _, _} = Reason} ->
|
|
%% Target table is not created
|
|
?tp(mysql_undefined_table, #{}),
|
|
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
|
|
{error, undefined_table};
|
|
{error, Reason} ->
|
|
% FIXME: we should try to differ on transient failures and
|
|
% syntax failures. Retrying syntax failures is not very productive.
|
|
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
|
|
{error, Reason}
|
|
end;
|
|
prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
|
|
prepare_sql_to_conn(Conn, Rest).
|
|
|
|
unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
|
|
ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
|
|
lists:foreach(
|
|
fun(Conn) ->
|
|
lists:foreach(
|
|
fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
|
|
maps:to_list(Templates)
|
|
)
|
|
end,
|
|
get_connections_from_pool(PoolName)
|
|
).
|
|
|
|
unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
|
|
mysql:unprepare(Conn, Key);
|
|
unprepare_sql_to_conn(Conn, Key) when is_atom(Key) ->
|
|
mysql:unprepare(Conn, Key);
|
|
unprepare_sql_to_conn(_Conn, _) ->
|
|
ok.
|
|
|
|
parse_prepare_sql(Config) ->
|
|
parse_prepare_sql(send_message, Config).
|
|
|
|
parse_prepare_sql(Key, Config) ->
|
|
Queries =
|
|
case Config of
|
|
#{prepare_statement := Qs} ->
|
|
Qs;
|
|
#{sql := Query} ->
|
|
#{Key => Query};
|
|
_ ->
|
|
#{}
|
|
end,
|
|
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
|
|
#{query_templates => Templates}.
|
|
|
|
parse_prepare_sql(Key, Query, Acc) ->
|
|
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '?'}),
|
|
AccNext = Acc#{{Key, prepstmt} => Template},
|
|
parse_batch_sql(Key, Query, AccNext).
|
|
|
|
parse_batch_sql(Key, Query, Acc) ->
|
|
case emqx_utils_sql:get_statement_type(Query) of
|
|
insert ->
|
|
case emqx_utils_sql:parse_insert(Query) of
|
|
{ok, {Insert, Params}} ->
|
|
RowTemplate = emqx_template_sql:parse(Params),
|
|
Acc#{{Key, batch} => {Insert, RowTemplate}};
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "parse insert sql statement failed",
|
|
sql => Query,
|
|
reason => Reason
|
|
}),
|
|
Acc
|
|
end;
|
|
select ->
|
|
Acc;
|
|
Type ->
|
|
?SLOG(error, #{
|
|
msg => "invalid sql statement type",
|
|
sql => Query,
|
|
type => Type
|
|
}),
|
|
Acc
|
|
end.
|
|
|
|
proc_sql_params(query, SQLOrKey, Params, _State) ->
|
|
{SQLOrKey, Params};
|
|
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
|
|
{SQLOrKey, Params};
|
|
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
|
|
case maps:get({TypeOrKey, prepstmt}, Templates, undefined) of
|
|
undefined ->
|
|
{SQLOrData, Params};
|
|
{_InsertPart, RowTemplate} ->
|
|
% NOTE
|
|
% Ignoring errors here, missing variables are set to `null`.
|
|
{Row, _Errors} = emqx_template_sql:render_prepstmt(
|
|
RowTemplate,
|
|
{emqx_jsonish, SQLOrData}
|
|
),
|
|
{TypeOrKey, Row}
|
|
end;
|
|
proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) ->
|
|
{SQLOrData, Params}.
|
|
|
|
on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
|
|
Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],
|
|
Query = [InsertPart, <<" values ">> | lists:join($,, Rows)],
|
|
on_sql_query(InstId, query, Query, no_params, default_timeout, State).
|
|
|
|
render_row(RowTemplate, Data) ->
|
|
% NOTE
|
|
% Ignoring errors here, missing variables are set to "'undefined'" due to backward
|
|
% compatibility requirements.
|
|
RenderOpts = #{escaping => mysql, undefined => <<"undefined">>},
|
|
{Row, _Errors} = emqx_template_sql:render(RowTemplate, {emqx_jsonish, Data}, RenderOpts),
|
|
Row.
|
|
|
|
on_sql_query(
|
|
InstId,
|
|
SQLFunc,
|
|
SQLOrKey,
|
|
Params,
|
|
Timeout,
|
|
#{pool_name := PoolName} = State
|
|
) ->
|
|
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
|
|
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
|
ChannelID = maps:get(channel_id, State, no_channel),
|
|
emqx_trace:rendered_action_template(
|
|
ChannelID,
|
|
#{
|
|
sql_or_key => SQLOrKey,
|
|
parameters => Params
|
|
}
|
|
),
|
|
Worker = ecpool:get_client(PoolName),
|
|
case ecpool_worker:client(Worker) of
|
|
{ok, Conn} ->
|
|
?tp(
|
|
mysql_connector_send_query,
|
|
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Params}
|
|
),
|
|
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
|
|
{error, disconnected} ->
|
|
?tp(
|
|
error,
|
|
"mysql_connector_do_sql_query_failed",
|
|
LogMeta#{reason => worker_is_disconnected}
|
|
),
|
|
{error, {recoverable_error, disconnected}}
|
|
end.
|
|
|
|
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta) ->
|
|
try mysql:SQLFunc(Conn, SQLOrKey, Params, no_filtermap_fun, Timeout) of
|
|
{error, disconnected} ->
|
|
?SLOG(
|
|
error,
|
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
|
|
),
|
|
%% kill the pool worker to trigger reconnection
|
|
_ = exit(Conn, restart),
|
|
{error, {recoverable_error, disconnected}};
|
|
{error, not_prepared} = Error ->
|
|
?tp(
|
|
mysql_connector_prepare_query_failed,
|
|
#{error => not_prepared}
|
|
),
|
|
?SLOG(
|
|
warning,
|
|
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
|
),
|
|
Error;
|
|
{error, {1053, <<"08S01">>, Reason}} ->
|
|
%% mysql sql server shutdown in progress
|
|
?SLOG(
|
|
error,
|
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
|
|
),
|
|
{error, {recoverable_error, Reason}};
|
|
{error, Reason} ->
|
|
?SLOG(
|
|
error,
|
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
|
|
),
|
|
{error, {unrecoverable_error, Reason}};
|
|
Result ->
|
|
?tp(
|
|
mysql_connector_query_return,
|
|
#{result => Result}
|
|
),
|
|
Result
|
|
catch
|
|
error:badarg ->
|
|
?SLOG(
|
|
error,
|
|
LogMeta#{msg => "mysql_connector_invalid_params", params => Params}
|
|
),
|
|
{error, {unrecoverable_error, {invalid_params, Params}}}
|
|
end.
|