diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 88f55af52..b2f876d21 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -573,7 +573,7 @@ render_headers(HeaderTks, Msg) -> render_template(Template, Msg) -> % NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`. - {String, _Errors} = emqx_template:render(Template, Msg), + {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}), String. render_template_string(Template, Msg) -> diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 2eeccfd77..a34b65ede 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -565,6 +565,7 @@ t_simple_sql_query(Config) -> ok. t_missing_data(Config) -> + BatchSize = ?config(batch_size, Config), ?assertMatch( {ok, _}, create_bridge(Config) @@ -575,13 +576,27 @@ t_missing_data(Config) -> ), send_message(Config, #{}), {ok, [Event]} = snabbkaffe:receive_events(SRef), - ?assertMatch( - #{ - result := - {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}} - }, - Event - ), + case BatchSize of + N when N > 1 -> + ?assertMatch( + #{ + result := + {error, + {unrecoverable_error, + {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}} + }, + Event + ); + 1 -> + ?assertMatch( + #{ + result := + {error, + {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}} + }, + Event + ) + end, ok. t_bad_sql_parameter(Config) -> diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index e052b9b89..d8b7994ab 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -426,8 +426,12 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) - undefined -> {SQLOrData, Params}; {_InsertPart, RowTemplate} -> - % NOTE: ignoring errors here, missing variables are set to `null`. - {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, SQLOrData), + % NOTE + % Ignoring errors here, missing variables are set to `null`. + {Row, _Errors} = emqx_template_sql:render_prepstmt( + RowTemplate, + {emqx_jsonish, SQLOrData} + ), {TypeOrKey, Row} end. @@ -437,8 +441,11 @@ on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) -> on_sql_query(InstId, query, Query, no_params, default_timeout, State). render_row(RowTemplate, Data) -> - % NOTE: ignoring errors here, missing variables are set to "NULL". - {Row, _Errors} = emqx_template_sql:render(RowTemplate, Data, #{escaping => mysql}), + % NOTE + % Ignoring errors here, missing variables are set to "'undefined'" due to backward + % compatibility requirements. + RenderOpts = #{escaping => mysql, undefined => <<"undefined">>}, + {Row, _Errors} = emqx_template_sql:render(RowTemplate, {emqx_jsonish, Data}, RenderOpts), Row. on_sql_query( diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 3f7b43c79..814d8a074 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -313,8 +313,8 @@ do_check_prepares( case validate_table_existence(WorkerPids, SQL) of ok -> ok; - {error, undefined_table} -> - {error, {undefined_table, State}} + {error, Reason} -> + {error, Reason} end; do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> ok; @@ -433,7 +433,7 @@ parse_prepare_sql(Key, Query, Acc) -> render_prepare_sql_row(RowTemplate, Data) -> % NOTE: ignoring errors here, missing variables will be replaced with `null`. - {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, Data), + {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}), Row. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> @@ -443,10 +443,13 @@ init_prepare(State = #{}) -> {ok, PrepStatements} -> State#{prepares => PrepStatements}; Error -> - ?SLOG(error, maps:merge( - #{msg => <<"postgresql_init_prepare_statement_failed">>}, - translate_to_log_context(Error) - )), + ?SLOG( + error, + maps:merge( + #{msg => <<"postgresql_init_prepare_statement_failed">>}, + translate_to_log_context(Error) + ) + ), %% mark the prepares failed State#{prepares => Error} end. @@ -484,7 +487,7 @@ prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> - LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, sql => SQL}, + LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 4d386840f..90df6003b 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -249,15 +249,15 @@ bin(Val) -> emqx_utils_conv:bin(Val). -spec quote_sql(_Value) -> iolist(). quote_sql(Str) -> - emqx_utils_sql:to_sql_string(Str, #{escaping => sql}). + emqx_utils_sql:to_sql_string(Str, #{escaping => sql, undefined => <<"undefined">>}). -spec quote_cql(_Value) -> iolist(). quote_cql(Str) -> - emqx_utils_sql:to_sql_string(Str, #{escaping => cql}). + emqx_utils_sql:to_sql_string(Str, #{escaping => cql, undefined => <<"undefined">>}). -spec quote_mysql(_Value) -> iolist(). quote_mysql(Str) -> - emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}). + emqx_utils_sql:to_sql_string(Str, #{escaping => mysql, undefined => <<"undefined">>}). lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> Value; diff --git a/apps/emqx_utils/src/emqx_template_sql.erl b/apps/emqx_utils/src/emqx_template_sql.erl index 4e9d8f622..9b2c1d55c 100644 --- a/apps/emqx_utils/src/emqx_template_sql.erl +++ b/apps/emqx_utils/src/emqx_template_sql.erl @@ -40,7 +40,12 @@ }. -type render_opts() :: #{ - escaping => mysql | cql | sql + %% String escaping rules to use. + %% Default: `sql` (generic) + escaping => sql | mysql | cql, + %% Value to map `undefined` to, either to NULLs or to arbitrary strings. + %% Default: `null` + undefined => null | unicode:chardata() }. -define(TEMPLATE_PARSE_OPTS, [strip_double_quote]). diff --git a/apps/emqx_utils/src/emqx_utils_sql.erl b/apps/emqx_utils/src/emqx_utils_sql.erl index 12aac6464..9ce9e576d 100644 --- a/apps/emqx_utils/src/emqx_utils_sql.erl +++ b/apps/emqx_utils/src/emqx_utils_sql.erl @@ -82,8 +82,13 @@ to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map). %% SQL statements. The value is escaped if necessary. -spec to_sql_string(term(), Options) -> unicode:chardata() when Options :: #{ - escaping => cql | mysql | sql + escaping => mysql | sql | cql, + undefined => null | unicode:chardata() }. +to_sql_string(undefined, #{undefined := Str} = Opts) when Str =/= null -> + to_sql_string(Str, Opts); +to_sql_string(undefined, #{}) -> + <<"NULL">>; to_sql_string(String, #{escaping := mysql}) when is_binary(String) -> try escape_mysql(String) @@ -99,8 +104,6 @@ to_sql_string(Term, #{}) -> maybe_escape(Term, fun escape_sql/1). -spec maybe_escape(_Value, fun((binary()) -> iodata())) -> unicode:chardata(). -maybe_escape(undefined, _EscapeFun) -> - <<"NULL">>; maybe_escape(Str, EscapeFun) when is_binary(Str) -> EscapeFun(Str); maybe_escape(Str, EscapeFun) when is_list(Str) -> diff --git a/apps/emqx_utils/test/emqx_template_SUITE.erl b/apps/emqx_utils/test/emqx_template_SUITE.erl index f8355f769..4dfe5de2e 100644 --- a/apps/emqx_utils/test/emqx_template_SUITE.erl +++ b/apps/emqx_utils/test/emqx_template_SUITE.erl @@ -235,6 +235,10 @@ t_render_sql(_) -> ?assertEqual( <<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}',n:NULL,u:'utf8\\'s cool 🐸'"/utf8>>, bin(emqx_template_sql:render_strict(Template, Context, #{})) + ), + ?assertEqual( + <<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}',n:'undefined',u:'utf8\\'s cool 🐸'"/utf8>>, + bin(emqx_template_sql:render_strict(Template, Context, #{undefined => "undefined"})) ). t_render_mysql(_) ->