diff --git a/apps/emqx_connector/src/emqx_connector_sql.erl b/apps/emqx_connector/src/emqx_connector_sql.erl new file mode 100644 index 000000000..be0b220e6 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_sql.erl @@ -0,0 +1,159 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_sql). + +-export([get_statement_type/1]). +-export([parse_insert/1]). + +-export([to_sql_value/1]). +-export([to_sql_string/2]). + +-export([escape_sql/1]). +-export([escape_cql/1]). +-export([escape_mysql/1]). + +-export_type([value/0]). + +-type statement_type() :: select | insert | delete. +-type value() :: null | binary() | number() | boolean() | [value()]. + +-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}). + +-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}. +get_statement_type(Query) -> + KnownTypes = #{ + <<"select">> => select, + <<"insert">> => insert, + <<"delete">> => delete + }, + case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of + {match, [Token]} -> + maps:get(string:lowercase(Token), KnownTypes, {error, unknown}); + _ -> + {error, unknown} + end. + +%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part. +%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">> +%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}} +-spec parse_insert(iodata()) -> + {ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}. +parse_insert(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. + +%% @doc Convert an Erlang term to a value that can be used primarily in +%% prepared SQL statements. +-spec to_sql_value(term()) -> value(). +to_sql_value(undefined) -> null; +to_sql_value(List) when is_list(List) -> List; +to_sql_value(Bin) when is_binary(Bin) -> Bin; +to_sql_value(Num) when is_number(Num) -> Num; +to_sql_value(Bool) when is_boolean(Bool) -> Bool; +to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map). + +%% @doc Convert an Erlang term to a string that can be interpolated in literal +%% SQL statements. The value is escaped if necessary. +-spec to_sql_string(term(), Options) -> iodata() when + Options :: #{ + escaping => cql | mysql | sql + }. +to_sql_string(String, #{escaping := mysql}) when is_binary(String) -> + try + escape_mysql(String) + catch + throw:invalid_utf8 -> + [<<"0x">>, binary:encode_hex(String)] + end; +to_sql_string(Term, #{escaping := mysql}) -> + maybe_escape(Term, fun escape_mysql/1); +to_sql_string(Term, #{escaping := cql}) -> + maybe_escape(Term, fun escape_cql/1); +to_sql_string(Term, #{}) -> + maybe_escape(Term, fun escape_sql/1). + +-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata(). +maybe_escape(undefined, _EscapeFun) -> + <<"NULL">>; +maybe_escape(Str, EscapeFun) when is_binary(Str) -> + EscapeFun(Str); +maybe_escape(Str, EscapeFun) when is_list(Str) -> + case unicode:characters_to_binary(Str) of + Bin when is_binary(Bin) -> + EscapeFun(Bin); + Otherwise -> + error(Otherwise) + end; +maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) -> + EscapeFun(emqx_connector_template:to_string(Val)); +maybe_escape(Val, _EscapeFun) -> + emqx_connector_template:to_string(Val). + +-spec escape_sql(binary()) -> iodata(). +escape_sql(S) -> + % NOTE + % This is a bit misleading: currently, escaping logic in `escape_sql/1` likely + % won't work with pgsql since it does not support C-style escapes by default. + % https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS + ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_cql(binary()) -> iodata(). +escape_cql(S) -> + ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_mysql(binary()) -> iodata(). +escape_mysql(S0) -> + % https://dev.mysql.com/doc/refman/8.0/en/string-literals.html + [$', escape_mysql(S0, 0, 0, S0), $']. + +%% NOTE +%% This thing looks more complicated than needed because it's optimized for as few +%% intermediate memory (re)allocations as possible. +escape_mysql(<<$', Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<0, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) -> + CWidth = byte_size(S) - byte_size(Rest), + escape_mysql(Rest, I, Run + CWidth, Src); +escape_mysql(<<>>, 0, _, Src) -> + Src; +escape_mysql(<<>>, I, Run, Src) -> + binary:part(Src, I, Run); +escape_mysql(_, _I, _Run, _Src) -> + throw(invalid_utf8). + +escape_prepend(_RunI, 0, _Src, Tail) -> + Tail; +escape_prepend(I, Run, Src, Tail) -> + [binary:part(Src, I, Run) | Tail]. diff --git a/apps/emqx_connector/src/emqx_connector_template.erl b/apps/emqx_connector/src/emqx_connector_template.erl new file mode 100644 index 000000000..c346d4289 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_template.erl @@ -0,0 +1,351 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_template). + +-include_lib("emqx/include/emqx_placeholder.hrl"). + +-export([parse/1]). +-export([parse/2]). +-export([parse_deep/1]). +-export([parse_deep/2]). +-export([validate/2]). +-export([trivial/1]). +-export([unparse/1]). +-export([render/2]). +-export([render/3]). +-export([render_strict/2]). +-export([render_strict/3]). + +-export([to_string/1]). + +-export_type([t/0]). +-export_type([str/0]). +-export_type([deep/0]). +-export_type([placeholder/0]). +-export_type([bindings/0]). + +-type t() :: str() | {'$tpl', deeptpl()}. + +-type str() :: [unicode:chardata() | placeholder()]. +-type deep() :: {'$tpl', deeptpl()}. + +-type deeptpl() :: + t() + | #{deeptpl() => deeptpl()} + | {list, [deeptpl()]} + | {tuple, [deeptpl()]} + | scalar() + | function() + | pid() + | port() + | reference(). + +-type placeholder() :: {var, var()}. +-type var() :: _Name :: [binary()]. + +-type scalar() :: atom() | unicode:chardata() | number(). +-type binding() :: scalar() | list(scalar()) | bindings(). +-type bindings() :: #{atom() | binary() => binding()}. + +-type var_trans() :: + fun((Value :: term()) -> unicode:chardata()) + | fun((var(), Value :: term()) -> unicode:chardata()). + +-type parse_opts() :: #{ + strip_double_quote => boolean() +}. + +-type render_opts() :: #{ + var_trans => var_trans() +}. + +-define(RE_PLACEHOLDER, "\\$(\\$?)\\{[.]?([a-zA-Z0-9._]*)\\}"). + +%% @doc Parse a unicode string into a template. +%% String might contain zero or more of placeholders in the form of `${var}`, +%% where `var` is a _location_ (possibly deeply nested) of some value in the +%% bindings map. +%% String might contain special escaped form `$${...}` which interpreted as a +%% literal `${...}`. +-spec parse(String :: unicode:chardata()) -> + t(). +parse(String) -> + parse(String, #{}). + +-spec parse(String :: unicode:chardata(), parse_opts()) -> + t(). +parse(String, Opts) -> + RE = + case Opts of + #{strip_double_quote := true} -> + <<"((?|" ?RE_PLACEHOLDER "|\"" ?RE_PLACEHOLDER "\"))">>; + #{} -> + <<"(" ?RE_PLACEHOLDER ")">> + end, + Splits = re:split(String, RE, [{return, binary}, group, trim, unicode]), + Components = lists:flatmap(fun parse_split/1, Splits), + Components. + +parse_split([Part, _PH, <<>>, Var]) -> + % Regular placeholder + prepend(Part, [{var, parse_var(Var)}]); +parse_split([Part, _PH = <>, <<"$">>, _]) -> + % Escaped literal, take all but the second byte, which is always `$`. + % Important to make a whole token starting with `$` so the `unparse/11` + % function can distinguish escaped literals. + prepend(Part, [<>]); +parse_split([Tail]) -> + [Tail]. + +prepend(<<>>, To) -> + To; +prepend(Head, To) -> + [Head | To]. + +parse_var(Var) -> + case string:split(Var, <<".">>, all) of + [<<>>] -> + ?PH_VAR_THIS; + Name -> + % TODO: lowercase? + Name + end. + +-spec validate([var() | binary()], t()) -> + ok | {error, [_Error :: {var(), disallowed}]}. +validate(AllowedIn, Template) -> + Allowed = [try_parse_var(V) || V <- AllowedIn], + {_, Errors} = render(Template, #{}), + {Used, _} = lists:unzip(Errors), + case lists:usort(Used) -- Allowed of + [] -> + ok; + Disallowed -> + {error, [{Var, disallowed} || Var <- Disallowed]} + end. + +try_parse_var(Var) when is_binary(Var) -> + parse_var(Var); +try_parse_var(Name) when is_list(Name) -> + Name. + +-spec trivial(t()) -> + boolean(). +trivial(Template) -> + validate([], Template) == ok. + +-spec unparse(t()) -> + unicode:chardata(). +unparse({'$tpl', Template}) -> + unparse_deep(Template); +unparse(Template) -> + lists:map(fun unparse_part/1, Template). + +unparse_part({var, Name}) -> + render_placeholder(Name); +unparse_part(Part = <<"${", _/binary>>) -> + <<"$", Part/binary>>; +unparse_part(Part) -> + Part. + +render_placeholder(Name) -> + "${" ++ lists:join($., Name) ++ "}". + +%% @doc Render a template with given bindings. +%% Returns a term with all placeholders replaced with values from bindings. +%% If one or more placeholders are not found in bindings, an error is returned. +%% By default, all binding values are converted to strings using `to_string/1` +%% function. Option `var_trans` can be used to override this behaviour. +-spec render(t(), bindings()) -> + {term(), [_Error :: {var(), undefined}]}. +render(Template, Bindings) -> + render(Template, Bindings, #{}). + +-spec render(t(), bindings(), render_opts()) -> + {term(), [_Error :: {var(), undefined}]}. +render(Template, Bindings, Opts) when is_list(Template) -> + lists:mapfoldl( + fun + ({var, Name}, EAcc) -> + {String, Errors} = render_binding(Name, Bindings, Opts), + {String, Errors ++ EAcc}; + (String, EAcc) -> + {String, EAcc} + end, + [], + Template + ); +render({'$tpl', Template}, Bindings, Opts) -> + render_deep(Template, Bindings, Opts). + +render_binding(Name, Bindings, Opts) -> + case lookup_var(Name, Bindings) of + {ok, Value} -> + {render_value(Name, Value, Opts), []}; + {error, Reason} -> + % TODO + % Currently, it's not possible to distinguish between a missing value + % and an atom `undefined` in `TransFun`. + {render_value(Name, undefined, Opts), [{Name, Reason}]} + end. + +render_value(_Name, Value, #{var_trans := TransFun}) when is_function(TransFun, 1) -> + TransFun(Value); +render_value(Name, Value, #{var_trans := TransFun}) when is_function(TransFun, 2) -> + TransFun(Name, Value); +render_value(_Name, Value, #{}) -> + to_string(Value). + +-spec render_strict(t(), bindings()) -> + unicode:chardata(). +render_strict(Template, Bindings) -> + render_strict(Template, Bindings, #{}). + +-spec render_strict(t(), bindings(), render_opts()) -> + unicode:chardata(). +render_strict(Template, Bindings, Opts) -> + case render(Template, Bindings, Opts) of + {String, []} -> + String; + {_, Errors = [_ | _]} -> + error(Errors, [unicode:characters_to_list(unparse(Template)), Bindings]) + end. + +%% @doc Parse an arbitrary Erlang term into a "deep" template. +%% Any binaries nested in the term are treated as string templates, while +%% lists are not analyzed for "printability" and are treated as nested terms. +%% The result is a usual template, and can be fed to other functions in this +%% module. +-spec parse_deep(unicode:chardata()) -> + t(). +parse_deep(Term) -> + parse_deep(Term, #{}). + +-spec parse_deep(unicode:chardata(), parse_opts()) -> + t(). +parse_deep(Term, Opts) -> + {'$tpl', parse_deep_term(Term, Opts)}. + +parse_deep_term(Term, Opts) when is_map(Term) -> + maps:fold( + fun(K, V, Acc) -> + Acc#{parse_deep_term(K, Opts) => parse_deep_term(V, Opts)} + end, + #{}, + Term + ); +parse_deep_term(Term, Opts) when is_list(Term) -> + {list, [parse_deep_term(E, Opts) || E <- Term]}; +parse_deep_term(Term, Opts) when is_tuple(Term) -> + {tuple, [parse_deep_term(E, Opts) || E <- tuple_to_list(Term)]}; +parse_deep_term(Term, Opts) when is_binary(Term) -> + parse(Term, Opts); +parse_deep_term(Term, _Opts) -> + Term. + +render_deep(Template, Bindings, Opts) when is_map(Template) -> + maps:fold( + fun(KT, VT, {Acc, Errors}) -> + {K, KErrors} = render_deep(KT, Bindings, Opts), + {V, VErrors} = render_deep(VT, Bindings, Opts), + {Acc#{K => V}, KErrors ++ VErrors ++ Errors} + end, + {#{}, []}, + Template + ); +render_deep({list, Template}, Bindings, Opts) when is_list(Template) -> + lists:mapfoldr( + fun(T, Errors) -> + {E, VErrors} = render_deep(T, Bindings, Opts), + {E, VErrors ++ Errors} + end, + [], + Template + ); +render_deep({tuple, Template}, Bindings, Opts) when is_list(Template) -> + {Term, Errors} = render_deep({list, Template}, Bindings, Opts), + {list_to_tuple(Term), Errors}; +render_deep(Template, Bindings, Opts) when is_list(Template) -> + {String, Errors} = render(Template, Bindings, Opts), + {unicode:characters_to_binary(String), Errors}; +render_deep(Term, _Bindings, _Opts) -> + {Term, []}. + +unparse_deep(Template) when is_map(Template) -> + maps:fold( + fun(K, V, Acc) -> + Acc#{unparse_deep(K) => unparse_deep(V)} + end, + #{}, + Template + ); +unparse_deep({list, Template}) when is_list(Template) -> + [unparse_deep(E) || E <- Template]; +unparse_deep({tuple, Template}) when is_list(Template) -> + list_to_tuple(unparse_deep({list, Template})); +unparse_deep(Template) when is_list(Template) -> + unicode:characters_to_binary(unparse(Template)); +unparse_deep(Term) -> + Term. + +%% + +-spec lookup_var(var(), bindings()) -> + {ok, binding()} | {error, undefined}. +lookup_var(?PH_VAR_THIS, Value) -> + {ok, Value}; +lookup_var([], Value) -> + {ok, Value}; +lookup_var([Prop | Rest], Bindings) -> + case lookup(Prop, Bindings) of + {ok, Value} -> + lookup_var(Rest, Value); + {error, Reason} -> + {error, Reason} + end. + +-spec lookup(Prop :: binary(), bindings()) -> + {ok, binding()} | {error, undefined}. +lookup(Prop, Bindings) when is_binary(Prop) -> + case maps:get(Prop, Bindings, undefined) of + undefined -> + try + {ok, maps:get(binary_to_existing_atom(Prop, utf8), Bindings)} + catch + error:{badkey, _} -> + {error, undefined}; + error:badarg -> + {error, undefined} + end; + Value -> + {ok, Value} + end. + +-spec to_string(binding()) -> + unicode:chardata(). +to_string(undefined) -> + []; +to_string(Bin) when is_binary(Bin) -> Bin; +to_string(Num) when is_integer(Num) -> integer_to_binary(Num); +to_string(Num) when is_float(Num) -> float_to_binary(Num, [{decimals, 10}, compact]); +to_string(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_string(Map) when is_map(Map) -> emqx_utils_json:encode(Map); +to_string(List) when is_list(List) -> + case io_lib:printable_unicode_list(List) of + true -> List; + false -> emqx_utils_json:encode(List) + end. diff --git a/apps/emqx_connector/src/emqx_connector_template_sql.erl b/apps/emqx_connector/src/emqx_connector_template_sql.erl new file mode 100644 index 000000000..0febfe575 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_template_sql.erl @@ -0,0 +1,135 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_template_sql). + +-export([parse/1]). +-export([parse/2]). +-export([render/3]). +-export([render_strict/3]). + +-export([parse_prepstmt/2]). +-export([render_prepstmt/2]). +-export([render_prepstmt_strict/2]). + +-export_type([row_template/0]). + +-type template() :: emqx_connector_template:t(). +-type row_template() :: [emqx_connector_template:placeholder()]. +-type bindings() :: emqx_connector_template:bindings(). + +-type values() :: [emqx_connector_sql:value()]. + +-type parse_opts() :: #{ + parameters => '$n' | '?', + % Inherited from `emqx_connector_template:parse_opts()` + strip_double_quote => boolean() +}. + +-type render_opts() :: #{ + escaping => mysql | cql | sql +}. + +-define(TEMPLATE_PARSE_OPTS, [strip_double_quote]). + +%% + +%% @doc Parse an SQL statement string with zero or more placeholders into a template. +-spec parse(unicode:chardata()) -> + template(). +parse(String) -> + parse(String, #{}). + +%% @doc Parse an SQL statement string with zero or more placeholders into a template. +-spec parse(unicode:chardata(), parse_opts()) -> + template(). +parse(String, Opts) -> + emqx_connector_template:parse(String, Opts). + +%% @doc Render an SQL statement template given a set of bindings. +%% Interpolation generally follows the SQL syntax, strings are escaped according to the +%% `escaping` option. +-spec render(template(), bindings(), render_opts()) -> + {unicode:chardata(), [_Error]}. +render(Template, Bindings, Opts) -> + emqx_connector_template:render(Template, Bindings, #{ + var_trans => fun(Value) -> emqx_connector_sql:to_sql_string(Value, Opts) end + }). + +%% @doc Render an SQL statement template given a set of bindings. +%% Errors are raised if any placeholders are not bound. +-spec render_strict(template(), bindings(), render_opts()) -> + unicode:chardata(). +render_strict(Template, Bindings, Opts) -> + emqx_connector_template:render_strict(Template, Bindings, #{ + var_trans => fun(Value) -> emqx_connector_sql:to_sql_string(Value, Opts) end + }). + +%% @doc Parse an SQL statement string into a prepared statement and a row template. +%% The row template is a template for a row of SQL values to be inserted to a database +%% during the execution of the prepared statement. +%% Example: +%% ``` +%% {Statement, RowTemplate} = emqx_connector_template_sql:parse_prepstmt( +%% "INSERT INTO table (id, name, age) VALUES (${id}, ${name}, 42)", +%% #{parameters => '$n'} +%% ), +%% Statement = <<"INSERT INTO table (id, name, age) VALUES ($1, $2, 42)">>, +%% RowTemplate = [{var, [...]}, ...] +%% ``` +-spec parse_prepstmt(unicode:chardata(), parse_opts()) -> + {unicode:chardata(), row_template()}. +parse_prepstmt(String, Opts) -> + Template = emqx_connector_template:parse(String, maps:with(?TEMPLATE_PARSE_OPTS, Opts)), + Statement = mk_prepared_statement(Template, Opts), + Placeholders = [Placeholder || Placeholder = {var, _} <- Template], + {Statement, Placeholders}. + +mk_prepared_statement(Template, Opts) -> + ParameterFormat = maps:get(parameters, Opts, '?'), + {Statement, _} = + lists:mapfoldl( + fun + ({var, _}, Acc) -> + mk_replace(ParameterFormat, Acc); + (String, Acc) -> + {String, Acc} + end, + 1, + Template + ), + Statement. + +mk_replace('?', Acc) -> + {"?", Acc}; +mk_replace('$n', N) -> + {"$" ++ integer_to_list(N), N + 1}. + +%% @doc Render a row template into a list of SQL values. +%% An _SQL value_ is a vaguely defined concept here, it is something that's considered +%% compatible with the protocol of the database being used. See the definition of +%% `emqx_connector_sql:value()` for more details. +-spec render_prepstmt(template(), bindings()) -> + {values(), [_Error]}. +render_prepstmt(Template, Bindings) -> + Opts = #{var_trans => fun emqx_connector_sql:to_sql_value/1}, + emqx_connector_template:render(Template, Bindings, Opts). + +-spec render_prepstmt_strict(template(), bindings()) -> + values(). +render_prepstmt_strict(Template, Bindings) -> + Opts = #{var_trans => fun emqx_connector_sql:to_sql_value/1}, + emqx_connector_template:render_strict(Template, Bindings, Opts). diff --git a/apps/emqx_connector/src/emqx_connector_utils.erl b/apps/emqx_connector/src/emqx_connector_utils.erl deleted file mode 100644 index 6000f6be5..000000000 --- a/apps/emqx_connector/src/emqx_connector_utils.erl +++ /dev/null @@ -1,35 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_utils). - --export([split_insert_sql/1]). - -%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> -split_insert_sql(SQL) -> - case re:split(SQL, "((?i)values)", [{return, binary}]) of - [Part1, _, Part3] -> - case string:trim(Part1, leading) of - <<"insert", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - <<"INSERT", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - _ -> - {error, not_insert_sql} - end; - _ -> - {error, not_insert_sql} - end. diff --git a/apps/emqx_connector/test/emqx_connector_template_SUITE.erl b/apps/emqx_connector/test/emqx_connector_template_SUITE.erl new file mode 100644 index 000000000..666fbfa58 --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_template_SUITE.erl @@ -0,0 +1,323 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_template_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +t_render(_) -> + Bindings = #{ + a => <<"1">>, + b => 1, + c => 1.0, + d => #{<<"d1">> => <<"hi">>}, + l => [0, 1, 1000], + u => "utf-8 is ǝɹǝɥ" + }, + Template = emqx_connector_template:parse( + <<"a:${a},b:${b},c:${c},d:${d},d1:${d.d1},l:${l},u:${u}">> + ), + ?assertEqual( + {<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"},d1:hi,l:[0,1,1000],u:utf-8 is ǝɹǝɥ"/utf8>>, []}, + render_string(Template, Bindings) + ). + +t_render_var_trans(_) -> + Bindings = #{a => <<"1">>, b => 1, c => #{prop => 1.0}}, + Template = emqx_connector_template:parse(<<"a:${a},b:${b},c:${c.prop}">>), + {String, Errors} = emqx_connector_template:render( + Template, + Bindings, + #{var_trans => fun(Name, _) -> "<" ++ lists:join($., Name) ++ ">" end} + ), + ?assertEqual( + {<<"a:,b:,c:">>, []}, + {bin(String), Errors} + ). + +t_render_path(_) -> + Bindings = #{d => #{d1 => <<"hi">>}}, + Template = emqx_connector_template:parse(<<"d.d1:${d.d1}">>), + ?assertEqual( + ok, + emqx_connector_template:validate([<<"d.d1">>], Template) + ), + ?assertEqual( + {<<"d.d1:hi">>, []}, + render_string(Template, Bindings) + ). + +t_render_custom_ph(_) -> + Bindings = #{a => <<"a">>, b => <<"b">>}, + Template = emqx_connector_template:parse(<<"a:${a},b:${b}">>), + ?assertEqual( + {error, [{[<<"b">>], disallowed}]}, + emqx_connector_template:validate([<<"a">>], Template) + ), + ?assertEqual( + <<"a:a,b:b">>, + render_strict_string(Template, Bindings) + ). + +t_render_this(_) -> + Bindings = #{a => <<"a">>, b => [1, 2, 3]}, + Template = emqx_connector_template:parse(<<"this:${} / also:${.}">>), + ?assertEqual(ok, emqx_connector_template:validate([?PH_VAR_THIS], Template)), + ?assertEqual( + % NOTE: order of the keys in the JSON object depends on the JSON encoder + <<"this:{\"b\":[1,2,3],\"a\":\"a\"} / also:{\"b\":[1,2,3],\"a\":\"a\"}">>, + render_strict_string(Template, Bindings) + ). + +t_render_missing_bindings(_) -> + Bindings = #{no => #{}}, + Template = emqx_connector_template:parse( + <<"a:${a},b:${b},c:${c},d:${d.d1},e:${no.such_atom_i_swear}">> + ), + ?assertEqual( + {<<"a:,b:,c:,d:,e:">>, [ + {[<<"no">>, <<"such_atom_i_swear">>], undefined}, + {[<<"d">>, <<"d1">>], undefined}, + {[<<"c">>], undefined}, + {[<<"b">>], undefined}, + {[<<"a">>], undefined} + ]}, + render_string(Template, Bindings) + ), + ?assertError( + [ + {[<<"no">>, <<"such_atom_i_swear">>], undefined}, + {[<<"d">>, <<"d1">>], undefined}, + {[<<"c">>], undefined}, + {[<<"b">>], undefined}, + {[<<"a">>], undefined} + ], + render_strict_string(Template, Bindings) + ). + +t_unparse(_) -> + TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}}">>, + Template = emqx_connector_template:parse(TString), + ?assertEqual( + TString, + unicode:characters_to_binary(emqx_connector_template:unparse(Template)) + ). + +t_trivial(_) -> + ?assertEqual( + true, + emqx_connector_template:trivial(emqx_connector_template:parse(<<"">>)) + ), + ?assertEqual( + false, + emqx_connector_template:trivial(emqx_connector_template:parse(<<"a:${a},b:${b},c:$${c}">>)) + ), + ?assertEqual( + true, + emqx_connector_template:trivial( + emqx_connector_template:parse(<<"a:$${a},b:$${b},c:$${c}">>) + ) + ). + +t_render_partial_ph(_) -> + Bindings = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + Template = emqx_connector_template:parse(<<"a:$a,b:b},c:{c},d:${d">>), + ?assertEqual( + <<"a:$a,b:b},c:{c},d:${d">>, + render_strict_string(Template, Bindings) + ). + +t_parse_escaped(_) -> + Bindings = #{a => <<"1">>, b => 1}, + Template = emqx_connector_template:parse(<<"a:${a},b:$${b}">>), + ?assertEqual( + <<"a:1,b:${b}">>, + render_strict_string(Template, Bindings) + ). + +t_parse_escaped_dquote(_) -> + Bindings = #{a => <<"1">>, b => 1}, + Template = emqx_connector_template:parse(<<"a:\"${a}\",b:\"$${b}\"">>, #{ + strip_double_quote => true + }), + ?assertEqual( + <<"a:1,b:\"${b}\"">>, + render_strict_string(Template, Bindings) + ). + +t_parse_sql_prepstmt(_) -> + Bindings = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + {PrepareStatement, RowTemplate} = + emqx_connector_template_sql:parse_prepstmt(<<"a:${a},b:${b},c:${c},d:${d}">>, #{ + parameters => '?' + }), + ?assertEqual(<<"a:?,b:?,c:?,d:?">>, bin(PrepareStatement)), + ?assertEqual( + {[<<"1">>, 1, 1.0, <<"{\"d1\":\"hi\"}">>], _Errors = []}, + emqx_connector_template_sql:render_prepstmt(RowTemplate, Bindings) + ). + +t_parse_sql_prepstmt_n(_) -> + Bindings = #{a => undefined, b => true, c => atom, d => #{d1 => 42.1337}}, + {PrepareStatement, RowTemplate} = + emqx_connector_template_sql:parse_prepstmt(<<"a:${a},b:${b},c:${c},d:${d}">>, #{ + parameters => '$n' + }), + ?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, bin(PrepareStatement)), + ?assertEqual( + [null, true, <<"atom">>, <<"{\"d1\":42.1337}">>], + emqx_connector_template_sql:render_prepstmt_strict(RowTemplate, Bindings) + ). + +t_parse_sql_prepstmt_partial_ph(_) -> + Bindings = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + {PrepareStatement, RowTemplate} = + emqx_connector_template_sql:parse_prepstmt(<<"a:$a,b:b},c:{c},d:${d">>, #{parameters => '?'}), + ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, bin(PrepareStatement)), + ?assertEqual([], emqx_connector_template_sql:render_prepstmt_strict(RowTemplate, Bindings)). + +t_render_sql(_) -> + Bindings = #{ + a => <<"1">>, + b => 1, + c => 1.0, + d => #{d1 => <<"hi">>}, + n => undefined, + u => "utf8's cool 🐸" + }, + Template = emqx_connector_template:parse(<<"a:${a},b:${b},c:${c},d:${d},n:${n},u:${u}">>), + ?assertMatch( + {_String, _Errors = []}, + emqx_connector_template_sql:render(Template, Bindings, #{}) + ), + ?assertEqual( + <<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}',n:NULL,u:'utf8\\'s cool 🐸'"/utf8>>, + bin(emqx_connector_template_sql:render_strict(Template, Bindings, #{})) + ). + +t_render_mysql(_) -> + %% with apostrophes + %% https://github.com/emqx/emqx/issues/4135 + Bindings = #{ + a => <<"1''2">>, + b => 1, + c => 1.0, + d => #{d1 => <<"someone's phone">>}, + e => <<$\\, 0, "💩"/utf8>>, + f => <<"non-utf8", 16#DCC900:24>>, + g => "utf8's cool 🐸", + h => imgood + }, + Template = emqx_connector_template_sql:parse( + <<"a:${a},b:${b},c:${c},d:${d},e:${e},f:${f},g:${g},h:${h}">> + ), + ?assertEqual( + << + "a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'," + "e:'\\\\\\0💩',f:0x6E6F6E2D75746638DCC900,g:'utf8\\'s cool 🐸',"/utf8, + "h:'imgood'" + >>, + bin(emqx_connector_template_sql:render_strict(Template, Bindings, #{escaping => mysql})) + ). + +t_render_cql(_) -> + %% with apostrophes for cassandra + %% https://github.com/emqx/emqx/issues/4148 + Bindings = #{ + a => <<"1''2">>, + b => 1, + c => 1.0, + d => #{d1 => <<"someone's phone">>} + }, + Template = emqx_connector_template:parse(<<"a:${a},b:${b},c:${c},d:${d}">>), + ?assertEqual( + <<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, + bin(emqx_connector_template_sql:render_strict(Template, Bindings, #{escaping => cql})) + ). + +t_render_sql_custom_ph(_) -> + {PrepareStatement, RowTemplate} = + emqx_connector_template_sql:parse_prepstmt(<<"a:${a},b:${b}">>, #{parameters => '$n'}), + ?assertEqual( + {error, [{[<<"b">>], disallowed}]}, + emqx_connector_template:validate([<<"a">>], RowTemplate) + ), + ?assertEqual(<<"a:$1,b:$2">>, bin(PrepareStatement)). + +t_render_sql_strip_double_quote(_) -> + Bindings = #{a => <<"a">>, b => <<"b">>}, + + %% no strip_double_quote option: "${key}" -> "value" + {PrepareStatement1, RowTemplate1} = emqx_connector_template_sql:parse_prepstmt( + <<"a:\"${a}\",b:\"${b}\"">>, + #{parameters => '$n'} + ), + ?assertEqual(<<"a:\"$1\",b:\"$2\"">>, bin(PrepareStatement1)), + ?assertEqual( + [<<"a">>, <<"b">>], + emqx_connector_template_sql:render_prepstmt_strict(RowTemplate1, Bindings) + ), + + %% strip_double_quote = true: "${key}" -> value + {PrepareStatement2, RowTemplate2} = emqx_connector_template_sql:parse_prepstmt( + <<"a:\"${a}\",b:\"${b}\"">>, + #{parameters => '$n', strip_double_quote => true} + ), + ?assertEqual(<<"a:$1,b:$2">>, bin(PrepareStatement2)), + ?assertEqual( + [<<"a">>, <<"b">>], + emqx_connector_template_sql:render_prepstmt_strict(RowTemplate2, Bindings) + ). + +t_render_tmpl_deep(_) -> + Bindings = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + + Template = emqx_connector_template:parse_deep( + #{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>, <<"$${d}">>], 0}]} + ), + + ?assertEqual( + {error, [{V, disallowed} || V <- [[<<"b">>], [<<"c">>]]]}, + emqx_connector_template:validate([<<"a">>], Template) + ), + + ?assertEqual( + #{<<"1">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>, <<"${d}">>], 0}]}, + emqx_connector_template:render_strict(Template, Bindings) + ). + +t_unparse_tmpl_deep(_) -> + Term = #{<<"${a}">> => [<<"$${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}, + Template = emqx_connector_template:parse_deep(Term), + ?assertEqual(Term, emqx_connector_template:unparse(Template)). + +%% + +render_string(Template, Bindings) -> + {String, Errors} = emqx_connector_template:render(Template, Bindings), + {bin(String), Errors}. + +render_strict_string(Template, Bindings) -> + bin(emqx_connector_template:render_strict(Template, Bindings)). + +bin(String) -> + unicode:characters_to_binary(String).