fix(tpl): ensure full backward compat in basic connectors
This commit is contained in:
parent
02c1bd70b6
commit
b5b6c3f8cc
|
@ -573,7 +573,7 @@ render_headers(HeaderTks, Msg) ->
|
|||
|
||||
render_template(Template, Msg) ->
|
||||
% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
|
||||
{String, _Errors} = emqx_template:render(Template, Msg),
|
||||
{String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}),
|
||||
String.
|
||||
|
||||
render_template_string(Template, Msg) ->
|
||||
|
|
|
@ -565,6 +565,7 @@ t_simple_sql_query(Config) ->
|
|||
ok.
|
||||
|
||||
t_missing_data(Config) ->
|
||||
BatchSize = ?config(batch_size, Config),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
|
@ -575,13 +576,27 @@ t_missing_data(Config) ->
|
|||
),
|
||||
send_message(Config, #{}),
|
||||
{ok, [Event]} = snabbkaffe:receive_events(SRef),
|
||||
?assertMatch(
|
||||
#{
|
||||
result :=
|
||||
{error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
|
||||
},
|
||||
Event
|
||||
),
|
||||
case BatchSize of
|
||||
N when N > 1 ->
|
||||
?assertMatch(
|
||||
#{
|
||||
result :=
|
||||
{error,
|
||||
{unrecoverable_error,
|
||||
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}
|
||||
},
|
||||
Event
|
||||
);
|
||||
1 ->
|
||||
?assertMatch(
|
||||
#{
|
||||
result :=
|
||||
{error,
|
||||
{unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
|
||||
},
|
||||
Event
|
||||
)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_bad_sql_parameter(Config) ->
|
||||
|
|
|
@ -426,8 +426,12 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -
|
|||
undefined ->
|
||||
{SQLOrData, Params};
|
||||
{_InsertPart, RowTemplate} ->
|
||||
% NOTE: ignoring errors here, missing variables are set to `null`.
|
||||
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, SQLOrData),
|
||||
% NOTE
|
||||
% Ignoring errors here, missing variables are set to `null`.
|
||||
{Row, _Errors} = emqx_template_sql:render_prepstmt(
|
||||
RowTemplate,
|
||||
{emqx_jsonish, SQLOrData}
|
||||
),
|
||||
{TypeOrKey, Row}
|
||||
end.
|
||||
|
||||
|
@ -437,8 +441,11 @@ on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
|
|||
on_sql_query(InstId, query, Query, no_params, default_timeout, State).
|
||||
|
||||
render_row(RowTemplate, Data) ->
|
||||
% NOTE: ignoring errors here, missing variables are set to "NULL".
|
||||
{Row, _Errors} = emqx_template_sql:render(RowTemplate, Data, #{escaping => mysql}),
|
||||
% NOTE
|
||||
% Ignoring errors here, missing variables are set to "'undefined'" due to backward
|
||||
% compatibility requirements.
|
||||
RenderOpts = #{escaping => mysql, undefined => <<"undefined">>},
|
||||
{Row, _Errors} = emqx_template_sql:render(RowTemplate, {emqx_jsonish, Data}, RenderOpts),
|
||||
Row.
|
||||
|
||||
on_sql_query(
|
||||
|
|
|
@ -313,8 +313,8 @@ do_check_prepares(
|
|||
case validate_table_existence(WorkerPids, SQL) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, undefined_table} ->
|
||||
{error, {undefined_table, State}}
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
|
||||
ok;
|
||||
|
@ -433,7 +433,7 @@ parse_prepare_sql(Key, Query, Acc) ->
|
|||
|
||||
render_prepare_sql_row(RowTemplate, Data) ->
|
||||
% NOTE: ignoring errors here, missing variables will be replaced with `null`.
|
||||
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, Data),
|
||||
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
|
||||
Row.
|
||||
|
||||
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
|
||||
|
@ -443,10 +443,13 @@ init_prepare(State = #{}) ->
|
|||
{ok, PrepStatements} ->
|
||||
State#{prepares => PrepStatements};
|
||||
Error ->
|
||||
?SLOG(error, maps:merge(
|
||||
#{msg => <<"postgresql_init_prepare_statement_failed">>},
|
||||
translate_to_log_context(Error)
|
||||
)),
|
||||
?SLOG(
|
||||
error,
|
||||
maps:merge(
|
||||
#{msg => <<"postgresql_init_prepare_statement_failed">>},
|
||||
translate_to_log_context(Error)
|
||||
)
|
||||
),
|
||||
%% mark the prepares failed
|
||||
State#{prepares => Error}
|
||||
end.
|
||||
|
@ -484,7 +487,7 @@ prepare_sql_to_conn(Conn, Prepares) ->
|
|||
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
|
||||
{ok, Statements};
|
||||
prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
|
||||
LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, sql => SQL},
|
||||
LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
|
||||
?SLOG(info, LogMeta),
|
||||
case epgsql:parse2(Conn, Key, SQL, []) of
|
||||
{ok, Statement} ->
|
||||
|
|
|
@ -249,15 +249,15 @@ bin(Val) -> emqx_utils_conv:bin(Val).
|
|||
|
||||
-spec quote_sql(_Value) -> iolist().
|
||||
quote_sql(Str) ->
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => sql}).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => sql, undefined => <<"undefined">>}).
|
||||
|
||||
-spec quote_cql(_Value) -> iolist().
|
||||
quote_cql(Str) ->
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => cql}).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => cql, undefined => <<"undefined">>}).
|
||||
|
||||
-spec quote_mysql(_Value) -> iolist().
|
||||
quote_mysql(Str) ->
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => mysql, undefined => <<"undefined">>}).
|
||||
|
||||
lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
|
||||
Value;
|
||||
|
|
|
@ -40,7 +40,12 @@
|
|||
}.
|
||||
|
||||
-type render_opts() :: #{
|
||||
escaping => mysql | cql | sql
|
||||
%% String escaping rules to use.
|
||||
%% Default: `sql` (generic)
|
||||
escaping => sql | mysql | cql,
|
||||
%% Value to map `undefined` to, either to NULLs or to arbitrary strings.
|
||||
%% Default: `null`
|
||||
undefined => null | unicode:chardata()
|
||||
}.
|
||||
|
||||
-define(TEMPLATE_PARSE_OPTS, [strip_double_quote]).
|
||||
|
|
|
@ -82,8 +82,13 @@ to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
|
|||
%% SQL statements. The value is escaped if necessary.
|
||||
-spec to_sql_string(term(), Options) -> unicode:chardata() when
|
||||
Options :: #{
|
||||
escaping => cql | mysql | sql
|
||||
escaping => mysql | sql | cql,
|
||||
undefined => null | unicode:chardata()
|
||||
}.
|
||||
to_sql_string(undefined, #{undefined := Str} = Opts) when Str =/= null ->
|
||||
to_sql_string(Str, Opts);
|
||||
to_sql_string(undefined, #{}) ->
|
||||
<<"NULL">>;
|
||||
to_sql_string(String, #{escaping := mysql}) when is_binary(String) ->
|
||||
try
|
||||
escape_mysql(String)
|
||||
|
@ -99,8 +104,6 @@ to_sql_string(Term, #{}) ->
|
|||
maybe_escape(Term, fun escape_sql/1).
|
||||
|
||||
-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> unicode:chardata().
|
||||
maybe_escape(undefined, _EscapeFun) ->
|
||||
<<"NULL">>;
|
||||
maybe_escape(Str, EscapeFun) when is_binary(Str) ->
|
||||
EscapeFun(Str);
|
||||
maybe_escape(Str, EscapeFun) when is_list(Str) ->
|
||||
|
|
|
@ -235,6 +235,10 @@ t_render_sql(_) ->
|
|||
?assertEqual(
|
||||
<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}',n:NULL,u:'utf8\\'s cool 🐸'"/utf8>>,
|
||||
bin(emqx_template_sql:render_strict(Template, Context, #{}))
|
||||
),
|
||||
?assertEqual(
|
||||
<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}',n:'undefined',u:'utf8\\'s cool 🐸'"/utf8>>,
|
||||
bin(emqx_template_sql:render_strict(Template, Context, #{undefined => "undefined"}))
|
||||
).
|
||||
|
||||
t_render_mysql(_) ->
|
||||
|
|
Loading…
Reference in New Issue