emqx/apps/emqx_mysql/src/emqx_mysql.erl

584 lines
19 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mysql).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
resource_type/0,
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2,
on_format_query_result/1
]).
%% ecpool connect & reconnect
-export([connect/1, prepare_sql_to_conn/2]).
-export([
init_prepare/1,
prepare_sql/2,
parse_prepare_sql/1,
parse_prepare_sql/2,
unprepare_sql/1
]).
-export([roots/0, fields/1, namespace/0]).
-export([do_get_status/1]).
-define(MYSQL_HOST_OPTIONS, #{
default_port => ?MYSQL_DEFAULT_PORT
}).
-type template() :: {unicode:chardata(), emqx_template:str()}.
-type state() ::
#{
pool_name := binary(),
prepares := ok | {error, _},
templates := #{{atom(), batch | prepstmt} => template()},
query_templates := map()
}.
-export_type([state/0]).
%%=====================================================================
%% Hocon schema
namespace() -> mysql.
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
[{server, server()}] ++
add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
emqx_connector_schema_lib:ssl_fields().
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
add_default_username([Field | Tail], Head) ->
add_default_username(Tail, Head ++ [Field]).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
%% ===================================================================
resource_type() -> mysql.
callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{
server := Server,
database := DB,
username := Username,
pool_size := PoolSize,
ssl := SSL
} = Config
) ->
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_mysql_connector",
connector => InstId,
config => emqx_utils:redact(Config)
}),
SslOpts =
case maps:get(enable, SSL) of
true ->
[{ssl, emqx_tls_lib:to_client_opts(SSL)}];
false ->
[]
end,
Options =
maybe_add_password_opt(
maps:get(password, Config, undefined),
[
{host, Host},
{port, Port},
{user, Username},
{database, DB},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
]
),
State = parse_prepare_sql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId})};
{error, Reason} ->
?tp(
mysql_connector_start_failed,
#{error => Reason}
),
{error, Reason}
end.
maybe_add_password_opt(undefined, Options) ->
Options;
maybe_add_password_opt(Password, Options) ->
[{password, Password} | Options].
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_mysql_connector",
connector => InstId
}),
emqx_resource_pool:stop(InstId).
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
on_query(
InstId,
{TypeOrKey, SQLOrKey, Params, Timeout},
State
) ->
MySqlFunction = mysql_function(TypeOrKey),
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
{error, not_prepared} ->
case maybe_prepare_sql(SQLOrKey2, State) of
ok ->
?tp(
mysql_connector_on_query_prepared_sql,
#{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
),
%% not return result, next loop will try again
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
{error, Reason} ->
?tp(
error,
"mysql_connector_do_prepare_failed",
#{
connector => InstId,
sql => SQLOrKey,
state => State,
reason => Reason
}
),
{error, Reason}
end;
Result ->
Result
end.
on_batch_query(
InstId,
BatchReq = [{Key, _} | _],
#{query_templates := Templates} = State
) ->
case maps:get({Key, batch}, Templates, undefined) of
undefined ->
{error, {unrecoverable_error, batch_select_not_implemented}};
Template ->
on_batch_insert(InstId, BatchReq, Template, State)
end;
on_batch_query(
InstId,
BatchReq,
State
) ->
?SLOG(error, #{
msg => "invalid request",
connector => InstId,
request => BatchReq,
state => State
}),
{error, {unrecoverable_error, invalid_request}}.
on_format_query_result({ok, ColumnNames, Rows}) ->
#{result => ok, column_names => ColumnNames, rows => Rows};
on_format_query_result({ok, DataList}) ->
#{result => ok, column_names_rows_list => DataList};
on_format_query_result(Result) ->
Result.
mysql_function(sql) ->
query;
mysql_function(prepared_query) ->
execute;
%% for bridge
mysql_function(_) ->
mysql_function(prepared_query).
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
connected;
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
{error, undefined_table} ->
{disconnected, State, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
end;
false ->
connecting
end.
do_get_status(Conn) ->
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
do_check_prepares(
#{
pool_name := PoolName,
templates := #{{send_message, prepstmt} := SQL}
}
) ->
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:foldl(
fun
(WorkerPid, ok) ->
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case mysql:prepare(Conn, get_status, SQL) of
{error, {1146, _, _}} ->
{error, undefined_table};
{ok, Statement} ->
mysql:unprepare(Conn, Statement);
_ ->
ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
ok,
Workers
);
do_check_prepares(#{prepares := ok}) ->
ok;
do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) ->
%% retry to prepare
case prepare_sql(State) of
ok ->
%% remove the error
{ok, State#{prepares => ok}};
{error, Reason} ->
{error, Reason}
end;
do_check_prepares(_NoTemplates) ->
ok.
%% ===================================================================
connect(Options) ->
%% TODO: teach `tdengine` to accept 0-arity closures as passwords.
NOptions = init_connect_opts(Options),
mysql:start_link(NOptions).
init_connect_opts(Options) ->
case lists:keytake(password, 1, Options) of
{value, {password, Secret}, Rest} ->
[{password, emqx_secret:unwrap(Secret)} | Rest];
false ->
Options
end.
init_prepare(State = #{query_templates := Templates}) ->
case maps:size(Templates) of
0 ->
State#{prepares => ok};
_ ->
case prepare_sql(State) of
ok ->
State#{prepares => ok};
{error, Reason} ->
?SLOG(error, #{
msg => <<"MySQL init prepare statement failed">>,
reason => Reason
}),
%% mark the prepare_statement as failed
State#{prepares => {error, Reason}}
end
end.
maybe_prepare_sql(SQLOrKey, State = #{query_templates := Templates}) ->
case maps:is_key({SQLOrKey, prepstmt}, Templates) of
true -> prepare_sql(State);
false -> {error, {unrecoverable_error, prepared_statement_invalid}}
end.
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
prepare_sql(maps:to_list(Templates), PoolName).
prepare_sql(Templates, PoolName) ->
case do_prepare_sql(Templates, PoolName) of
ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
ok;
{error, R} ->
{error, R}
end.
do_prepare_sql(Templates, PoolName) ->
Conns = get_connections_from_pool(PoolName),
prepare_sql_to_conn_list(Conns, Templates).
get_connections_from_pool(PoolName) ->
[
begin
{ok, Conn} = ecpool_worker:client(Worker),
Conn
end
|| {_Name, Worker} <- ecpool:workers(PoolName)
].
prepare_sql_to_conn_list([], _Templates) ->
ok;
prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
case prepare_sql_to_conn(Conn, Templates) of
ok ->
prepare_sql_to_conn_list(ConnList, Templates);
{error, R} ->
%% rollback
_ = [unprepare_sql_to_conn(Conn, Template) || Template <- Templates],
{error, R}
end.
prepare_sql_to_conn(_Conn, []) ->
ok;
prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL},
?SLOG(info, LogMeta),
_ = unprepare_sql_to_conn(Conn, Key),
case mysql:prepare(Conn, Key, SQL) of
{ok, _Key} ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, Rest);
{error, {1146, _, _} = Reason} ->
%% Target table is not created
?tp(mysql_undefined_table, #{}),
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, undefined_table};
{error, Reason} ->
% FIXME: we should try to differ on transient failures and
% syntax failures. Retrying syntax failures is not very productive.
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, Reason}
end;
prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
prepare_sql_to_conn(Conn, Rest).
unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
lists:foreach(
fun(Conn) ->
lists:foreach(
fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
maps:to_list(Templates)
)
end,
get_connections_from_pool(PoolName)
).
unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
mysql:unprepare(Conn, Key);
unprepare_sql_to_conn(Conn, Key) when is_atom(Key) ->
mysql:unprepare(Conn, Key);
unprepare_sql_to_conn(_Conn, _) ->
ok.
parse_prepare_sql(Config) ->
parse_prepare_sql(send_message, Config).
parse_prepare_sql(Key, Config) ->
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
#{Key => Query};
_ ->
#{}
end,
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
#{query_templates => Templates}.
parse_prepare_sql(Key, Query, Acc) ->
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '?'}),
AccNext = Acc#{{Key, prepstmt} => Template},
parse_batch_sql(Key, Query, AccNext).
parse_batch_sql(Key, Query, Acc) ->
case emqx_utils_sql:get_statement_type(Query) of
insert ->
case emqx_utils_sql:parse_insert(Query) of
{ok, {Insert, Params}} ->
RowTemplate = emqx_template_sql:parse(Params),
Acc#{{Key, batch} => {Insert, RowTemplate}};
{error, Reason} ->
?SLOG(error, #{
msg => "parse insert sql statement failed",
sql => Query,
reason => Reason
}),
Acc
end;
select ->
Acc;
Type ->
?SLOG(error, #{
msg => "invalid sql statement type",
sql => Query,
type => Type
}),
Acc
end.
proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
case maps:get({TypeOrKey, prepstmt}, Templates, undefined) of
undefined ->
{SQLOrData, Params};
{_InsertPart, RowTemplate} ->
% NOTE
% Ignoring errors here, missing variables are set to `null`.
{Row, _Errors} = emqx_template_sql:render_prepstmt(
RowTemplate,
{emqx_jsonish, SQLOrData}
),
{TypeOrKey, Row}
end;
proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) ->
{SQLOrData, Params}.
on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],
Query = [InsertPart, <<" values ">> | lists:join($,, Rows)],
on_sql_query(InstId, query, Query, no_params, default_timeout, State).
render_row(RowTemplate, Data) ->
% NOTE
% Ignoring errors here, missing variables are set to "'undefined'" due to backward
% compatibility requirements.
RenderOpts = #{escaping => mysql, undefined => <<"undefined">>},
{Row, _Errors} = emqx_template_sql:render(RowTemplate, {emqx_jsonish, Data}, RenderOpts),
Row.
on_sql_query(
InstId,
SQLFunc,
SQLOrKey,
Params,
Timeout,
#{pool_name := PoolName} = State
) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta),
ChannelID = maps:get(channel_id, State, no_channel),
emqx_trace:rendered_action_template(
ChannelID,
#{
sql_or_key => SQLOrKey,
parameters => Params
}
),
Worker = ecpool:get_client(PoolName),
case ecpool_worker:client(Worker) of
{ok, Conn} ->
?tp(
mysql_connector_send_query,
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Params}
),
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
{error, disconnected} ->
?tp(
error,
"mysql_connector_do_sql_query_failed",
LogMeta#{reason => worker_is_disconnected}
),
{error, {recoverable_error, disconnected}}
end.
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta) ->
try mysql:SQLFunc(Conn, SQLOrKey, Params, no_filtermap_fun, Timeout) of
{error, disconnected} ->
?SLOG(
error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
),
%% kill the pool worker to trigger reconnection
_ = exit(Conn, restart),
{error, {recoverable_error, disconnected}};
{error, not_prepared} = Error ->
?tp(
mysql_connector_prepare_query_failed,
#{error => not_prepared}
),
?SLOG(
warning,
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
),
Error;
{error, {1053, <<"08S01">>, Reason}} ->
%% mysql sql server shutdown in progress
?SLOG(
error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
),
{error, {recoverable_error, Reason}};
{error, Reason} ->
?SLOG(
error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
),
{error, {unrecoverable_error, Reason}};
Result ->
?tp(
mysql_connector_query_return,
#{result => Result}
),
Result
catch
error:badarg ->
?SLOG(
error,
LogMeta#{msg => "mysql_connector_invalid_params", params => Params}
),
{error, {unrecoverable_error, {invalid_params, Params}}}
end.