refactor(pluglib): provide SQL related utils in `emqx_utils_sql`

This commit is contained in:
Andrew Mayorov 2023-06-09 00:21:39 +03:00
parent a51baaa206
commit 8919a6ef93
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
7 changed files with 199 additions and 128 deletions

View File

@ -212,7 +212,7 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
emqx_placeholder:preproc_tmpl(ExtendParamTemplate). emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can %% This function is similar to emqx_utils_sql:parse_insert/1 but can
%% also handle Clickhouse's SQL extension for INSERT statments that allows the %% also handle Clickhouse's SQL extension for INSERT statments that allows the
%% user to specify different formats: %% user to specify different formats:
%% %%

View File

@ -443,11 +443,11 @@ parse_sql_template(Config) ->
parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks). parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
parse_sql_template([{Key, H} | T], BatchInsertTks) -> parse_sql_template([{Key, H} | T], BatchInsertTks) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of case emqx_utils_sql:get_statement_type(H) of
{ok, select} -> select ->
parse_sql_template(T, BatchInsertTks); parse_sql_template(T, BatchInsertTks);
{ok, insert} -> insert ->
case emqx_plugin_libs_rule:split_insert_sql(H) of case emqx_utils_sql:parse_insert(H) of
{ok, {InsertSQL, Params}} -> {ok, {InsertSQL, Params}} ->
parse_sql_template( parse_sql_template(
T, T,
@ -463,6 +463,9 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
parse_sql_template(T, BatchInsertTks) parse_sql_template(T, BatchInsertTks)
end; end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_sql_template(T, BatchInsertTks);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_sql_template(T, BatchInsertTks) parse_sql_template(T, BatchInsertTks)
@ -488,10 +491,19 @@ apply_template(
undefined -> undefined ->
BatchReqs; BatchReqs;
#{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} -> #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
{Key, SQL} {Key, SQL}
end; end;
apply_template(Query, Templates) -> apply_template(Query, Templates) ->
%% TODO: more detail infomatoin %% TODO: more detail infomatoin
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
{error, failed_to_apply_sql_template}. {error, failed_to_apply_sql_template}.
proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
Values = erlang:iolist_to_binary(
lists:join($,, [
emqx_placeholder:proc_sql_param_str(Tokens, Msg)
|| {_, Msg} <- BatchReqs
])
),
<<BatchInserts/binary, " values ", Values/binary>>.

View File

@ -256,10 +256,10 @@ parse_prepare_sql(Config) ->
parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of case emqx_utils_sql:get_statement_type(H) of
{ok, select} -> select ->
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{ok, insert} -> insert ->
InsertTks = emqx_placeholder:preproc_tmpl(H), InsertTks = emqx_placeholder:preproc_tmpl(H),
H1 = string:trim(H, trailing, ";"), H1 = string:trim(H, trailing, ";"),
case split_insert_sql(H1) of case split_insert_sql(H1) of
@ -275,6 +275,9 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
end; end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
@ -289,7 +292,7 @@ to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8). unicode:characters_to_binary(List, utf8).
split_insert_sql(SQL0) -> split_insert_sql(SQL0) ->
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), SQL = formalize_sql(SQL0),
lists:filtermap( lists:filtermap(
fun(E) -> fun(E) ->
case string:trim(E) of case string:trim(E) of
@ -301,3 +304,9 @@ split_insert_sql(SQL0) ->
end, end,
re:split(SQL, "(?i)(insert into)|(?i)(values)") re:split(SQL, "(?i)(insert into)|(?i)(values)")
). ).
formalize_sql(Input) ->
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
%% 2. trims the result
string:trim(SQL).

View File

@ -357,11 +357,11 @@ parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
}. }.
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) -> parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of case emqx_utils_sql:get_statement_type(H) of
{ok, select} -> select ->
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
{ok, insert} -> insert ->
case emqx_plugin_libs_rule:split_insert_sql(H) of case emqx_utils_sql:parse_insert(H) of
{ok, {InsertSQL, Params}} -> {ok, {InsertSQL, Params}} ->
ParamsTks = emqx_placeholder:preproc_tmpl(Params), ParamsTks = emqx_placeholder:preproc_tmpl(Params),
parse_prepare_sql( parse_prepare_sql(
@ -375,6 +375,9 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
end; end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)

View File

@ -19,10 +19,7 @@
%% preprocess and process template string with place holders %% preprocess and process template string with place holders
-export([ -export([
split_insert_sql/1, proc_batch_sql/3
detect_sql_type/1,
proc_batch_sql/3,
formalize_sql/1
]). ]).
%% type converting %% type converting
@ -52,44 +49,6 @@
-type tmpl_token() :: list({var, binary()} | {str, binary()}). -type tmpl_token() :: list({var, binary()} | {str, binary()}).
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
InsertSQL :: binary(),
Params :: binary().
split_insert_sql(SQL0) ->
SQL = formalize_sql(SQL0),
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.
-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
Type :: insert | select.
detect_sql_type(SQL) ->
case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
{match, [First]} ->
Types = [select, insert],
PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
LowFirst = string:lowercase(First),
case proplists:lookup(LowFirst, PropTypes) of
{LowFirst, Type} ->
{ok, Type};
_ ->
{error, invalid_sql}
end;
_ ->
{error, invalid_sql}
end.
-spec proc_batch_sql( -spec proc_batch_sql(
BatchReqs :: list({atom(), map()}), BatchReqs :: list({atom(), map()}),
InsertPart :: binary(), InsertPart :: binary(),
@ -104,12 +63,6 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
), ),
<<InsertPart/binary, " values ", ValuesPart/binary>>. <<InsertPart/binary, " values ", ValuesPart/binary>>.
formalize_sql(Input) ->
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
%% 2. trims the result
string:trim(SQL).
unsafe_atom_key(Key) when is_atom(Key) -> unsafe_atom_key(Key) when is_atom(Key) ->
Key; Key;
unsafe_atom_key(Key) when is_binary(Key) -> unsafe_atom_key(Key) when is_binary(Key) ->

View File

@ -90,8 +90,6 @@
| {tmpl, tmpl_token()} | {tmpl, tmpl_token()}
| {value, term()}. | {value, term()}.
-dialyzer({no_improper_lists, [quote_mysql/1, escape_mysql/4, escape_prepend/4]}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -247,22 +245,15 @@ bin(Val) -> emqx_utils_conv:bin(Val).
-spec quote_sql(_Value) -> iolist(). -spec quote_sql(_Value) -> iolist().
quote_sql(Str) -> quote_sql(Str) ->
quote_escape(Str, fun escape_sql/1). emqx_utils_sql:to_sql_string(Str, #{escaping => sql}).
-spec quote_cql(_Value) -> iolist(). -spec quote_cql(_Value) -> iolist().
quote_cql(Str) -> quote_cql(Str) ->
quote_escape(Str, fun escape_cql/1). emqx_utils_sql:to_sql_string(Str, #{escaping => cql}).
-spec quote_mysql(_Value) -> iolist(). -spec quote_mysql(_Value) -> iolist().
quote_mysql(Str) when is_binary(Str) ->
try
escape_mysql(Str)
catch
throw:invalid_utf8 ->
[<<"0x">> | binary:encode_hex(Str)]
end;
quote_mysql(Str) -> quote_mysql(Str) ->
quote_escape(Str, fun escape_mysql/1). emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}).
lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
Value; Value;
@ -370,57 +361,3 @@ unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) ->
binary:part(Val, {0, byte_size(Val) - 2}); binary:part(Val, {0, byte_size(Val) - 2});
unwrap(<<"${", Val/binary>>, _StripDoubleQuote) -> unwrap(<<"${", Val/binary>>, _StripDoubleQuote) ->
binary:part(Val, {0, byte_size(Val) - 1}). binary:part(Val, {0, byte_size(Val) - 1}).
-spec quote_escape(_Value, fun((binary()) -> iodata())) -> iodata().
quote_escape(Str, EscapeFun) when is_binary(Str) ->
EscapeFun(Str);
quote_escape(Str, EscapeFun) when is_list(Str) ->
case unicode:characters_to_binary(Str) of
Bin when is_binary(Bin) ->
EscapeFun(Bin);
Otherwise ->
error(Otherwise)
end;
quote_escape(Str, EscapeFun) when is_atom(Str) orelse is_map(Str) ->
EscapeFun(bin(Str));
quote_escape(Val, _EscapeFun) ->
bin(Val).
-spec escape_sql(binary()) -> iolist().
escape_sql(S) ->
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_cql(binary()) -> iolist().
escape_cql(S) ->
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_mysql(binary()) -> iolist().
escape_mysql(S0) ->
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
[$', escape_mysql(S0, 0, 0, S0), $'].
%% NOTE
%% This thing looks more complicated than needed because it's optimized for as few
%% intermediate memory (re)allocations as possible.
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
CWidth = byte_size(S) - byte_size(Rest),
escape_mysql(Rest, I, Run + CWidth, Src);
escape_mysql(<<>>, 0, _, Src) ->
Src;
escape_mysql(<<>>, I, Run, Src) ->
binary:part(Src, I, Run);
escape_mysql(_, _I, _Run, _Src) ->
throw(invalid_utf8).
escape_prepend(_RunI, 0, _Src, Tail) ->
Tail;
escape_prepend(I, Run, Src, Tail) ->
[binary:part(Src, I, Run) | Tail].

View File

@ -0,0 +1,157 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_utils_sql).
-export([get_statement_type/1]).
-export([parse_insert/1]).
-export([to_sql_value/1]).
-export([to_sql_string/2]).
-export([escape_sql/1]).
-export([escape_cql/1]).
-export([escape_mysql/1]).
-export_type([value/0]).
-type statement_type() :: select | insert | delete.
-type value() :: null | binary() | number() | boolean() | [value()].
-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}).
-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}.
get_statement_type(Query) ->
KnownTypes = #{
<<"select">> => select,
<<"insert">> => insert,
<<"delete">> => delete
},
case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of
{match, [Token]} ->
maps:get(string:lowercase(Token), KnownTypes, {error, unknown});
_ ->
{error, unknown}
end.
%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part.
%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">>
%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}}
-spec parse_insert(iodata()) ->
{ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}.
parse_insert(SQL) ->
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.
%% @doc Convert an Erlang term to a value that can be used primarily in
%% prepared SQL statements.
-spec to_sql_value(term()) -> value().
to_sql_value(undefined) -> null;
to_sql_value(List) when is_list(List) -> List;
to_sql_value(Bin) when is_binary(Bin) -> Bin;
to_sql_value(Num) when is_number(Num) -> Num;
to_sql_value(Bool) when is_boolean(Bool) -> Bool;
to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
%% @doc Convert an Erlang term to a string that can be interpolated in literal
%% SQL statements. The value is escaped if necessary.
-spec to_sql_string(term(), Options) -> iodata() when
Options :: #{
escaping => cql | mysql | sql
}.
to_sql_string(String, #{escaping := mysql}) when is_binary(String) ->
try
escape_mysql(String)
catch
throw:invalid_utf8 ->
[<<"0x">>, binary:encode_hex(String)]
end;
to_sql_string(Term, #{escaping := mysql}) ->
maybe_escape(Term, fun escape_mysql/1);
to_sql_string(Term, #{escaping := cql}) ->
maybe_escape(Term, fun escape_cql/1);
to_sql_string(Term, #{}) ->
maybe_escape(Term, fun escape_sql/1).
-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata().
maybe_escape(Str, EscapeFun) when is_binary(Str) ->
EscapeFun(Str);
maybe_escape(Str, EscapeFun) when is_list(Str) ->
case unicode:characters_to_binary(Str) of
Bin when is_binary(Bin) ->
EscapeFun(Bin);
Otherwise ->
error(Otherwise)
end;
maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) ->
EscapeFun(emqx_utils_conv:bin(Val));
maybe_escape(Val, _EscapeFun) ->
emqx_utils_conv:bin(Val).
-spec escape_sql(binary()) -> iodata().
escape_sql(S) ->
% NOTE
% This is a bit misleading: currently, escaping logic in `escape_sql/1` likely
% won't work with pgsql since it does not support C-style escapes by default.
% https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_cql(binary()) -> iodata().
escape_cql(S) ->
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_mysql(binary()) -> iodata().
escape_mysql(S0) ->
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
[$', escape_mysql(S0, 0, 0, S0), $'].
%% NOTE
%% This thing looks more complicated than needed because it's optimized for as few
%% intermediate memory (re)allocations as possible.
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
CWidth = byte_size(S) - byte_size(Rest),
escape_mysql(Rest, I, Run + CWidth, Src);
escape_mysql(<<>>, 0, _, Src) ->
Src;
escape_mysql(<<>>, I, Run, Src) ->
binary:part(Src, I, Run);
escape_mysql(_, _I, _Run, _Src) ->
throw(invalid_utf8).
escape_prepend(_RunI, 0, _Src, Tail) ->
Tail;
escape_prepend(I, Run, Src, Tail) ->
[binary:part(Src, I, Run) | Tail].