diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 002460c95..2d9af6bee 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -212,7 +212,7 @@ prepare_sql_bulk_extend_template(Template, Separator) -> ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), emqx_placeholder:preproc_tmpl(ExtendParamTemplate). -%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can +%% This function is similar to emqx_utils_sql:parse_insert/1 but can %% also handle Clickhouse's SQL extension for INSERT statments that allows the %% user to specify different formats: %% diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index cb0c0e16e..9512d2f6b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -443,11 +443,11 @@ parse_sql_template(Config) -> parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks). parse_sql_template([{Key, H} | T], BatchInsertTks) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_sql_template(T, BatchInsertTks); - {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of + insert -> + case emqx_utils_sql:parse_insert(H) of {ok, {InsertSQL, Params}} -> parse_sql_template( T, @@ -463,6 +463,9 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) -> ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), parse_sql_template(T, BatchInsertTks) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_sql_template(T, BatchInsertTks); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_sql_template(T, BatchInsertTks) @@ -488,10 +491,19 @@ apply_template( undefined -> BatchReqs; #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} -> - SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), + SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), {Key, SQL} end; apply_template(Query, Templates) -> %% TODO: more detail infomatoin ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), {error, failed_to_apply_sql_template}. + +proc_batch_sql(BatchReqs, BatchInserts, Tokens) -> + Values = erlang:iolist_to_binary( + lists:join($,, [ + emqx_placeholder:proc_sql_param_str(Tokens, Msg) + || {_, Msg} <- BatchReqs + ]) + ), + <>. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 6c1d06429..e47a2d083 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -256,10 +256,10 @@ parse_prepare_sql(Config) -> parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); - {ok, insert} -> + insert -> InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of @@ -275,6 +275,9 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) @@ -289,7 +292,7 @@ to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8). split_insert_sql(SQL0) -> - SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), + SQL = formalize_sql(SQL0), lists:filtermap( fun(E) -> case string:trim(E) of @@ -301,3 +304,9 @@ split_insert_sql(SQL0) -> end, re:split(SQL, "(?i)(insert into)|(?i)(values)") ). + +formalize_sql(Input) -> + %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. + SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), + %% 2. trims the result + string:trim(SQL). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index a46b21d92..9c40919f2 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -357,11 +357,11 @@ parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) -> }. parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); - {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of + insert -> + case emqx_utils_sql:parse_insert(H) of {ok, {InsertSQL, Params}} -> ParamsTks = emqx_placeholder:preproc_tmpl(Params), parse_prepare_sql( @@ -375,6 +375,9 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index aa9e38316..38dfb492f 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -19,10 +19,7 @@ %% preprocess and process template string with place holders -export([ - split_insert_sql/1, - detect_sql_type/1, - proc_batch_sql/3, - formalize_sql/1 + proc_batch_sql/3 ]). %% type converting @@ -52,44 +49,6 @@ -type tmpl_token() :: list({var, binary()} | {str, binary()}). -%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> --spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when - InsertSQL :: binary(), - Params :: binary(). -split_insert_sql(SQL0) -> - SQL = formalize_sql(SQL0), - case re:split(SQL, "((?i)values)", [{return, binary}]) of - [Part1, _, Part3] -> - case string:trim(Part1, leading) of - <<"insert", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - <<"INSERT", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - _ -> - {error, not_insert_sql} - end; - _ -> - {error, not_insert_sql} - end. - --spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when - Type :: insert | select. -detect_sql_type(SQL) -> - case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of - {match, [First]} -> - Types = [select, insert], - PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types], - LowFirst = string:lowercase(First), - case proplists:lookup(LowFirst, PropTypes) of - {LowFirst, Type} -> - {ok, Type}; - _ -> - {error, invalid_sql} - end; - _ -> - {error, invalid_sql} - end. - -spec proc_batch_sql( BatchReqs :: list({atom(), map()}), InsertPart :: binary(), @@ -104,12 +63,6 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ), <>. -formalize_sql(Input) -> - %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. - SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), - %% 2. trims the result - string:trim(SQL). - unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) -> diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 861548f93..fd40b906d 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -90,8 +90,6 @@ | {tmpl, tmpl_token()} | {value, term()}. --dialyzer({no_improper_lists, [quote_mysql/1, escape_mysql/4, escape_prepend/4]}). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -247,22 +245,15 @@ bin(Val) -> emqx_utils_conv:bin(Val). -spec quote_sql(_Value) -> iolist(). quote_sql(Str) -> - quote_escape(Str, fun escape_sql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => sql}). -spec quote_cql(_Value) -> iolist(). quote_cql(Str) -> - quote_escape(Str, fun escape_cql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => cql}). -spec quote_mysql(_Value) -> iolist(). -quote_mysql(Str) when is_binary(Str) -> - try - escape_mysql(Str) - catch - throw:invalid_utf8 -> - [<<"0x">> | binary:encode_hex(Str)] - end; quote_mysql(Str) -> - quote_escape(Str, fun escape_mysql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}). lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> Value; @@ -370,57 +361,3 @@ unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) -> binary:part(Val, {0, byte_size(Val) - 2}); unwrap(<<"${", Val/binary>>, _StripDoubleQuote) -> binary:part(Val, {0, byte_size(Val) - 1}). - --spec quote_escape(_Value, fun((binary()) -> iodata())) -> iodata(). -quote_escape(Str, EscapeFun) when is_binary(Str) -> - EscapeFun(Str); -quote_escape(Str, EscapeFun) when is_list(Str) -> - case unicode:characters_to_binary(Str) of - Bin when is_binary(Bin) -> - EscapeFun(Bin); - Otherwise -> - error(Otherwise) - end; -quote_escape(Str, EscapeFun) when is_atom(Str) orelse is_map(Str) -> - EscapeFun(bin(Str)); -quote_escape(Val, _EscapeFun) -> - bin(Val). - --spec escape_sql(binary()) -> iolist(). -escape_sql(S) -> - ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]), - [$', ES, $']. - --spec escape_cql(binary()) -> iolist(). -escape_cql(S) -> - ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]), - [$', ES, $']. - --spec escape_mysql(binary()) -> iolist(). -escape_mysql(S0) -> - % https://dev.mysql.com/doc/refman/8.0/en/string-literals.html - [$', escape_mysql(S0, 0, 0, S0), $']. - -%% NOTE -%% This thing looks more complicated than needed because it's optimized for as few -%% intermediate memory (re)allocations as possible. -escape_mysql(<<$', Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<0, Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) -> - CWidth = byte_size(S) - byte_size(Rest), - escape_mysql(Rest, I, Run + CWidth, Src); -escape_mysql(<<>>, 0, _, Src) -> - Src; -escape_mysql(<<>>, I, Run, Src) -> - binary:part(Src, I, Run); -escape_mysql(_, _I, _Run, _Src) -> - throw(invalid_utf8). - -escape_prepend(_RunI, 0, _Src, Tail) -> - Tail; -escape_prepend(I, Run, Src, Tail) -> - [binary:part(Src, I, Run) | Tail]. diff --git a/apps/emqx_utils/src/emqx_utils_sql.erl b/apps/emqx_utils/src/emqx_utils_sql.erl new file mode 100644 index 000000000..3caed6b62 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_sql.erl @@ -0,0 +1,157 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_sql). + +-export([get_statement_type/1]). +-export([parse_insert/1]). + +-export([to_sql_value/1]). +-export([to_sql_string/2]). + +-export([escape_sql/1]). +-export([escape_cql/1]). +-export([escape_mysql/1]). + +-export_type([value/0]). + +-type statement_type() :: select | insert | delete. +-type value() :: null | binary() | number() | boolean() | [value()]. + +-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}). + +-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}. +get_statement_type(Query) -> + KnownTypes = #{ + <<"select">> => select, + <<"insert">> => insert, + <<"delete">> => delete + }, + case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of + {match, [Token]} -> + maps:get(string:lowercase(Token), KnownTypes, {error, unknown}); + _ -> + {error, unknown} + end. + +%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part. +%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">> +%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}} +-spec parse_insert(iodata()) -> + {ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}. +parse_insert(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. + +%% @doc Convert an Erlang term to a value that can be used primarily in +%% prepared SQL statements. +-spec to_sql_value(term()) -> value(). +to_sql_value(undefined) -> null; +to_sql_value(List) when is_list(List) -> List; +to_sql_value(Bin) when is_binary(Bin) -> Bin; +to_sql_value(Num) when is_number(Num) -> Num; +to_sql_value(Bool) when is_boolean(Bool) -> Bool; +to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map). + +%% @doc Convert an Erlang term to a string that can be interpolated in literal +%% SQL statements. The value is escaped if necessary. +-spec to_sql_string(term(), Options) -> iodata() when + Options :: #{ + escaping => cql | mysql | sql + }. +to_sql_string(String, #{escaping := mysql}) when is_binary(String) -> + try + escape_mysql(String) + catch + throw:invalid_utf8 -> + [<<"0x">>, binary:encode_hex(String)] + end; +to_sql_string(Term, #{escaping := mysql}) -> + maybe_escape(Term, fun escape_mysql/1); +to_sql_string(Term, #{escaping := cql}) -> + maybe_escape(Term, fun escape_cql/1); +to_sql_string(Term, #{}) -> + maybe_escape(Term, fun escape_sql/1). + +-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata(). +maybe_escape(Str, EscapeFun) when is_binary(Str) -> + EscapeFun(Str); +maybe_escape(Str, EscapeFun) when is_list(Str) -> + case unicode:characters_to_binary(Str) of + Bin when is_binary(Bin) -> + EscapeFun(Bin); + Otherwise -> + error(Otherwise) + end; +maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) -> + EscapeFun(emqx_utils_conv:bin(Val)); +maybe_escape(Val, _EscapeFun) -> + emqx_utils_conv:bin(Val). + +-spec escape_sql(binary()) -> iodata(). +escape_sql(S) -> + % NOTE + % This is a bit misleading: currently, escaping logic in `escape_sql/1` likely + % won't work with pgsql since it does not support C-style escapes by default. + % https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS + ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_cql(binary()) -> iodata(). +escape_cql(S) -> + ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_mysql(binary()) -> iodata(). +escape_mysql(S0) -> + % https://dev.mysql.com/doc/refman/8.0/en/string-literals.html + [$', escape_mysql(S0, 0, 0, S0), $']. + +%% NOTE +%% This thing looks more complicated than needed because it's optimized for as few +%% intermediate memory (re)allocations as possible. +escape_mysql(<<$', Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<0, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) -> + CWidth = byte_size(S) - byte_size(Rest), + escape_mysql(Rest, I, Run + CWidth, Src); +escape_mysql(<<>>, 0, _, Src) -> + Src; +escape_mysql(<<>>, I, Run, Src) -> + binary:part(Src, I, Run); +escape_mysql(_, _I, _Run, _Src) -> + throw(invalid_utf8). + +escape_prepend(_RunI, 0, _Src, Tail) -> + Tail; +escape_prepend(I, Run, Src, Tail) -> + [binary:part(Src, I, Run) | Tail].