From 35902dc72db829d2ff0c4de4206aae04841533cb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 18 Apr 2023 15:34:38 +0300 Subject: [PATCH] feat(tpl): switch basic connectors to `emqx_connector_template` Also avoid `filename:join/2` in HTTP connector since it's both OS specific and an overkill. --- .../src/emqx_bridge_http_connector.erl | 120 ++++----- .../test/emqx_bridge_http_connector_tests.erl | 3 +- .../test/emqx_bridge_mysql_SUITE.erl | 30 +-- .../test/emqx_bridge_pgsql_SUITE.erl | 3 +- apps/emqx_mysql/src/emqx_mysql.erl | 248 +++++++++--------- apps/emqx_postgresql/src/emqx_postgresql.erl | 200 +++++++------- 6 files changed, 280 insertions(+), 324 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 5d1b1947c..869f081fb 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -479,61 +479,47 @@ preprocess_request( } = Req ) -> #{ - method => emqx_placeholder:preproc_tmpl(to_bin(Method)), - path => emqx_placeholder:preproc_tmpl(Path), - body => maybe_preproc_tmpl(body, Req), - headers => wrap_auth_header(preproc_headers(Headers)), + method => parse_template(to_bin(Method)), + path => parse_template(Path), + body => maybe_parse_template(body, Req), + headers => parse_headers(Headers), request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS), max_retries => maps:get(max_retries, Req, 2) }. -preproc_headers(Headers) when is_map(Headers) -> +parse_headers(Headers) when is_map(Headers) -> maps:fold( - fun(K, V, Acc) -> - [ - { - emqx_placeholder:preproc_tmpl(to_bin(K)), - emqx_placeholder:preproc_tmpl(to_bin(V)) - } - | Acc - ] - end, + fun(K, V, Acc) -> [parse_header(K, V) | Acc] end, [], Headers ); -preproc_headers(Headers) when is_list(Headers) -> +parse_headers(Headers) when is_list(Headers) -> lists:map( - fun({K, V}) -> - { - emqx_placeholder:preproc_tmpl(to_bin(K)), - emqx_placeholder:preproc_tmpl(to_bin(V)) - } - end, + fun({K, V}) -> parse_header(K, V) end, Headers ). -wrap_auth_header(Headers) -> - lists:map(fun maybe_wrap_auth_header/1, Headers). +parse_header(K, V) -> + KStr = to_bin(K), + VTpl = parse_template(to_bin(V)), + {parse_template(KStr), maybe_wrap_auth_header(KStr, VTpl)}. -maybe_wrap_auth_header({[{str, Key}] = StrKey, Val}) -> - {_, MaybeWrapped} = maybe_wrap_auth_header({Key, Val}), - {StrKey, MaybeWrapped}; -maybe_wrap_auth_header({Key, Val} = Header) when - is_binary(Key), (size(Key) =:= 19 orelse size(Key) =:= 13) +maybe_wrap_auth_header(Key, VTpl) when + (byte_size(Key) =:= 19 orelse byte_size(Key) =:= 13) -> %% We check the size of potential keys in the guard above and consider only %% those that match the number of characters of either "Authorization" or %% "Proxy-Authorization". case try_bin_to_lower(Key) of <<"authorization">> -> - {Key, emqx_secret:wrap(Val)}; + emqx_secret:wrap(VTpl); <<"proxy-authorization">> -> - {Key, emqx_secret:wrap(Val)}; + emqx_secret:wrap(VTpl); _Other -> - Header + VTpl end; -maybe_wrap_auth_header(Header) -> - Header. +maybe_wrap_auth_header(_Key, VTpl) -> + VTpl. try_bin_to_lower(Bin) -> try iolist_to_binary(string:lowercase(Bin)) of @@ -542,46 +528,57 @@ try_bin_to_lower(Bin) -> _:_ -> Bin end. -maybe_preproc_tmpl(Key, Conf) -> +maybe_parse_template(Key, Conf) -> case maps:get(Key, Conf, undefined) of undefined -> undefined; - Val -> emqx_placeholder:preproc_tmpl(Val) + Val -> parse_template(Val) end. +parse_template(String) -> + emqx_connector_template:parse(String). + process_request( #{ - method := MethodTks, - path := PathTks, - body := BodyTks, - headers := HeadersTks, + method := MethodTemplate, + path := PathTemplate, + body := BodyTemplate, + headers := HeadersTemplate, request_timeout := ReqTimeout } = Conf, Msg ) -> Conf#{ - method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)), - path => emqx_placeholder:proc_tmpl(PathTks, Msg), - body => process_request_body(BodyTks, Msg), - headers => proc_headers(HeadersTks, Msg), + method => make_method(render_template_string(MethodTemplate, Msg)), + path => unicode:characters_to_list(render_template(PathTemplate, Msg)), + body => render_request_body(BodyTemplate, Msg), + headers => render_headers(HeadersTemplate, Msg), request_timeout => ReqTimeout }. -process_request_body(undefined, Msg) -> +render_request_body(undefined, Msg) -> emqx_utils_json:encode(Msg); -process_request_body(BodyTks, Msg) -> - emqx_placeholder:proc_tmpl(BodyTks, Msg). +render_request_body(BodyTks, Msg) -> + render_template(BodyTks, Msg). -proc_headers(HeaderTks, Msg) -> +render_headers(HeaderTks, Msg) -> lists:map( fun({K, V}) -> { - emqx_placeholder:proc_tmpl(K, Msg), - emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg) + render_template_string(K, Msg), + render_template_string(emqx_secret:unwrap(V), Msg) } end, HeaderTks ). +render_template(Template, Msg) -> + % NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`. + {String, _Errors} = emqx_connector_template:render(Template, Msg), + String. + +render_template_string(Template, Msg) -> + unicode:characters_to_binary(render_template(Template, Msg)). + make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; make_method(M) when M == <<"GET">>; M == <<"get">> -> get; @@ -716,8 +713,6 @@ maybe_retry(Result, _Context, ReplyFunAndArgs) -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). %% The HOCON schema system may generate sensitive keys with this format -is_sensitive_key([{str, StringKey}]) -> - is_sensitive_key(StringKey); is_sensitive_key(Atom) when is_atom(Atom) -> is_sensitive_key(erlang:atom_to_binary(Atom)); is_sensitive_key(Bin) when is_binary(Bin), (size(Bin) =:= 19 orelse size(Bin) =:= 13) -> @@ -742,25 +737,19 @@ redact(Data) -> %% and we also can't know the body format and where the sensitive data will be %% so the easy way to keep data security is redacted the whole body redact_request({Path, Headers}) -> - {Path, redact(Headers)}; + {Path, Headers}; redact_request({Path, Headers, _Body}) -> - {Path, redact(Headers), <<"******">>}. + {Path, Headers, <<"******">>}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). redact_test_() -> - TestData1 = [ - {<<"content-type">>, <<"application/json">>}, - {<<"Authorization">>, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>} - ], - - TestData2 = #{ - headers => - [ - {[{str, <<"content-type">>}], [{str, <<"application/json">>}]}, - {[{str, <<"Authorization">>}], [{str, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>}]} - ] + TestData = #{ + headers => [ + {<<"content-type">>, <<"application/json">>}, + {<<"Authorization">>, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>} + ] }, [ ?_assert(is_sensitive_key(<<"Authorization">>)), @@ -770,8 +759,7 @@ redact_test_() -> ?_assert(is_sensitive_key('PrOxy-authoRizaTion')), ?_assertNot(is_sensitive_key(<<"Something">>)), ?_assertNot(is_sensitive_key(89)), - ?_assertNotEqual(TestData1, redact(TestData1)), - ?_assertNotEqual(TestData2, redact(TestData2)) + ?_assertNotEqual(TestData, redact(TestData)) ]. join_paths_test_() -> diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl index 6b5c2b0cd..1de210260 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl @@ -83,7 +83,8 @@ is_wrapped(Secret) when is_function(Secret) -> is_wrapped(_Other) -> false. -untmpl([{_, V} | _]) -> V. +untmpl(Tpl) -> + iolist_to_binary(emqx_connector_template:render_strict(Tpl, #{})). is_unwrapped_headers(Headers) -> lists:all(fun is_unwrapped_header/1, Headers). diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 3ed40e903..2eeccfd77 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -565,8 +565,6 @@ t_simple_sql_query(Config) -> ok. t_missing_data(Config) -> - BatchSize = ?config(batch_size, Config), - IsBatch = BatchSize > 1, ?assertMatch( {ok, _}, create_bridge(Config) @@ -577,27 +575,13 @@ t_missing_data(Config) -> ), send_message(Config, #{}), {ok, [Event]} = snabbkaffe:receive_events(SRef), - case IsBatch of - true -> - ?assertMatch( - #{ - result := - {error, - {unrecoverable_error, - {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}} - }, - Event - ); - false -> - ?assertMatch( - #{ - result := - {error, - {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}} - }, - Event - ) - end, + ?assertMatch( + #{ + result := + {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}} + }, + Event + ), ok. t_bad_sql_parameter(Config) -> diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index cd79db43d..156d4bd16 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -324,6 +324,7 @@ connect_and_drop_table(Config) -> connect_and_clear_table(Config) -> Con = connect_direct_pgsql(Config), + _ = epgsql:squery(Con, ?SQL_CREATE_TABLE), {ok, _} = epgsql:squery(Con, ?SQL_DELETE), ok = epgsql:close(Con). @@ -668,7 +669,7 @@ t_missing_table(Config) -> ok end, fun(Trace) -> - ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)), + ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)), ok end ), diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 4440bcfbb..927c9d067 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -46,16 +46,12 @@ default_port => ?MYSQL_DEFAULT_PORT }). --type prepares() :: #{atom() => binary()}. --type params_tokens() :: #{atom() => list()}. --type sqls() :: #{atom() => binary()}. +-type template() :: {unicode:chardata(), emqx_connector_template:str()}. -type state() :: #{ pool_name := binary(), - prepare_statement := prepares(), - params_tokens := params_tokens(), - batch_inserts := sqls(), - batch_params_tokens := params_tokens() + prepares := ok | {error, _}, + templates := #{{atom(), batch | prepstmt} => template()} }. %%===================================================================== @@ -154,13 +150,13 @@ on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) -> on_query( InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, - #{pool_name := PoolName, prepare_statement := Prepares} = State + State ) -> MySqlFunction = mysql_function(TypeOrKey), {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of {error, not_prepared} -> - case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of + case maybe_prepare_sql(SQLOrKey2, State) of ok -> ?tp( mysql_connector_on_query_prepared_sql, @@ -187,23 +183,27 @@ on_query( on_batch_query( InstId, - BatchReq, - #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State + BatchReq = [{Key, _} | _], + #{query_templates := Templates} = State ) -> - case hd(BatchReq) of - {Key, _} -> - case maps:get(Key, Inserts, undefined) of - undefined -> - {error, {unrecoverable_error, batch_select_not_implemented}}; - InsertSQL -> - Tokens = maps:get(Key, ParamsTokens), - on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State) - end; - Request -> - LogMeta = #{connector => InstId, first_request => Request, state => State}, - ?SLOG(error, LogMeta#{msg => "invalid request"}), - {error, {unrecoverable_error, invalid_request}} - end. + case maps:get({Key, batch}, Templates, undefined) of + undefined -> + {error, {unrecoverable_error, batch_select_not_implemented}}; + Template -> + on_batch_insert(InstId, BatchReq, Template, State) + end; +on_batch_query( + InstId, + BatchReq, + State +) -> + ?SLOG(error, #{ + msg => "invalid request", + connector => InstId, + request => BatchReq, + state => State + }), + {error, {unrecoverable_error, invalid_request}}. mysql_function(sql) -> query; @@ -222,8 +222,8 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; - {error, {undefined_table, NState}} -> - {disconnected, NState, unhealthy_target}; + {error, undefined_table} -> + {disconnected, State, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn connecting @@ -238,8 +238,8 @@ do_get_status(Conn) -> do_check_prepares( #{ pool_name := PoolName, - prepare_statement := #{send_message := SQL} - } = State + templates := #{{send_message, prepstmt} := SQL} + } ) -> % it's already connected. Verify if target table still exists Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], @@ -250,7 +250,7 @@ do_check_prepares( {ok, Conn} -> case mysql:prepare(Conn, get_status, SQL) of {error, {1146, _, _}} -> - {error, {undefined_table, State}}; + {error, undefined_table}; {ok, Statement} -> mysql:unprepare(Conn, Statement); _ -> @@ -265,17 +265,14 @@ do_check_prepares( ok, Workers ); -do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) -> +do_check_prepares(#{prepares := ok}) -> ok; -do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) -> +do_check_prepares(#{prepares := {error, _}} = State) -> %% retry to prepare - case prepare_sql(Prepares, PoolName) of + case prepare_sql(State) of ok -> %% remove the error - {ok, State#{prepare_statement => Prepares}}; - {error, undefined_table} -> - %% indicate the error - {error, {undefined_table, State#{prepare_statement => {error, Prepares}}}}; + {ok, State#{prepares => ok}}; {error, Reason} -> {error, Reason} end. @@ -285,41 +282,44 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, connect(Options) -> mysql:start_link(Options). -init_prepare(State = #{prepare_statement := Prepares, pool_name := PoolName}) -> - case maps:size(Prepares) of +init_prepare(State = #{query_templates := Templates}) -> + case maps:size(Templates) of 0 -> - State; + State#{prepares => ok}; _ -> - case prepare_sql(Prepares, PoolName) of + case prepare_sql(State) of ok -> - State; + State#{prepares => ok}; {error, Reason} -> - LogMeta = #{msg => <<"mysql_init_prepare_statement_failed">>, reason => Reason}, - ?SLOG(error, LogMeta), + ?SLOG(error, #{ + msg => <<"MySQL init prepare statement failed">>, + reason => Reason + }), %% mark the prepare_statement as failed - State#{prepare_statement => {error, Prepares}} + State#{prepares => {error, Reason}} end end. -maybe_prepare_sql(SQLOrKey, Prepares, PoolName) -> - case maps:is_key(SQLOrKey, Prepares) of - true -> prepare_sql(Prepares, PoolName); +maybe_prepare_sql(SQLOrKey, State = #{query_templates := Templates}) -> + case maps:is_key({SQLOrKey, prepstmt}, Templates) of + true -> prepare_sql(State); false -> {error, {unrecoverable_error, prepared_statement_invalid}} end. -prepare_sql(Prepares, PoolName) when is_map(Prepares) -> - prepare_sql(maps:to_list(Prepares), PoolName); -prepare_sql(Prepares, PoolName) -> - case do_prepare_sql(Prepares, PoolName) of +prepare_sql(#{query_templates := Templates, pool_name := PoolName}) -> + prepare_sql(maps:to_list(Templates), PoolName). + +prepare_sql(Templates, PoolName) -> + case do_prepare_sql(Templates, PoolName) of ok -> %% prepare for reconnect - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}), ok; {error, R} -> {error, R} end. -do_prepare_sql(Prepares, PoolName) -> +do_prepare_sql(Templates, PoolName) -> Conns = [ begin @@ -328,33 +328,30 @@ do_prepare_sql(Prepares, PoolName) -> end || {_Name, Worker} <- ecpool:workers(PoolName) ], - prepare_sql_to_conn_list(Conns, Prepares). + prepare_sql_to_conn_list(Conns, Templates). -prepare_sql_to_conn_list([], _PrepareList) -> +prepare_sql_to_conn_list([], _Templates) -> ok; -prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> - case prepare_sql_to_conn(Conn, PrepareList) of +prepare_sql_to_conn_list([Conn | ConnList], Templates) -> + case prepare_sql_to_conn(Conn, Templates) of ok -> - prepare_sql_to_conn_list(ConnList, PrepareList); + prepare_sql_to_conn_list(ConnList, Templates); {error, R} -> %% rollback - Fun = fun({Key, _}) -> - _ = unprepare_sql_to_conn(Conn, Key), - ok - end, - lists:foreach(Fun, PrepareList), + _ = [unprepare_sql_to_conn(Conn, Template) || Template <- Templates], {error, R} end. -prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; -prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> - LogMeta = #{msg => "mysql_prepare_statement", name => Key, prepare_sql => SQL}, +prepare_sql_to_conn(_Conn, []) -> + ok; +prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) -> + LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL}, ?SLOG(info, LogMeta), _ = unprepare_sql_to_conn(Conn, Key), case mysql:prepare(Conn, Key, SQL) of {ok, _Key} -> ?SLOG(info, LogMeta#{result => success}), - prepare_sql_to_conn(Conn, PrepareList); + prepare_sql_to_conn(Conn, Rest); {error, {1146, _, _} = Reason} -> %% Target table is not created ?tp(mysql_undefined_table, #{}), @@ -365,84 +362,85 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> % syntax failures. Retrying syntax failures is not very productive. ?SLOG(error, LogMeta#{result => failed, reason => Reason}), {error, Reason} - end. + end; +prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) -> + prepare_sql_to_conn(Conn, Rest). -unprepare_sql_to_conn(Conn, PrepareSqlKey) -> - mysql:unprepare(Conn, PrepareSqlKey). +unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) -> + mysql:unprepare(Conn, Key); +unprepare_sql_to_conn(Conn, Key) when is_atom(Key) -> + mysql:unprepare(Conn, Key); +unprepare_sql_to_conn(_Conn, _) -> + ok. parse_prepare_sql(Config) -> - SQL = - case maps:get(prepare_statement, Config, undefined) of - undefined -> - case maps:get(sql, Config, undefined) of - undefined -> #{}; - Template -> #{send_message => Template} - end; - Any -> - Any + Queries = + case Config of + #{prepare_statement := Qs} -> + Qs; + #{sql := Query} -> + #{send_message => Query}; + _ -> + #{} end, - parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}). + Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), + #{query_templates => Templates}. -parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) -> - {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H), - parse_batch_prepare_sql( - L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks - ); -parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) -> - #{ - prepare_statement => Prepares, - params_tokens => Tokens, - batch_inserts => BatchInserts, - batch_params_tokens => BatchTks - }. +parse_prepare_sql(Key, Query, Acc) -> + Template = emqx_connector_template_sql:parse_prepstmt(Query, #{parameters => '?'}), + AccNext = Acc#{{Key, prepstmt} => Template}, + parse_batch_sql(Key, Query, AccNext). -parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) -> - case emqx_utils_sql:get_statement_type(H) of - select -> - parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); +parse_batch_sql(Key, Query, Acc) -> + case emqx_connector_sql:get_statement_type(Query) of insert -> - case emqx_utils_sql:parse_insert(H) of - {ok, {InsertSQL, Params}} -> - ParamsTks = emqx_placeholder:preproc_tmpl(Params), - parse_prepare_sql( - T, - Prepares, - Tokens, - BatchInserts#{Key => InsertSQL}, - BatchTks#{Key => ParamsTks} - ); + case emqx_connector_sql:parse_insert(Query) of + {ok, {Insert, Params}} -> + RowTemplate = emqx_connector_template_sql:parse(Params), + Acc#{{Key, batch} => {Insert, RowTemplate}}; {error, Reason} -> - ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}), - parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) + ?SLOG(error, #{ + msg => "parse insert sql statement failed", + sql => Query, + reason => Reason + }), + Acc end; - Type when is_atom(Type) -> - ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}), - parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); - {error, Reason} -> - ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}), - parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) + select -> + Acc; + Otherwise -> + ?SLOG(error, #{ + msg => "invalid sql statement type", + sql => Query, + type => Otherwise + }), + Acc end. proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> - case maps:get(TypeOrKey, ParamsTokens, undefined) of +proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -> + case maps:get({TypeOrKey, prepstmt}, Templates, undefined) of undefined -> {SQLOrData, Params}; - Tokens -> - {TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)} + {_InsertPart, RowTemplate} -> + % NOTE: ignoring errors here, missing variables are set to `null`. + {Row, _Errors} = emqx_connector_template_sql:render_prepstmt(RowTemplate, SQLOrData), + {TypeOrKey, Row} end. -on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> - ValuesPart = lists:join($,, [ - emqx_placeholder:proc_param_str(Tokens, Msg, fun emqx_placeholder:quote_mysql/1) - || {_, Msg} <- BatchReqs - ]), - Query = [InsertPart, <<" values ">> | ValuesPart], +on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) -> + Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs], + Query = [InsertPart, <<" values ">> | lists:join($,, Rows)], on_sql_query(InstId, query, Query, no_params, default_timeout, State). +render_row(RowTemplate, Data) -> + % NOTE: ignoring errors here, missing variables are set to "NULL". + {Row, _Errors} = emqx_connector_template_sql:render(RowTemplate, Data, #{escaping => mysql}), + Row. + on_sql_query( InstId, SQLFunc, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index dc6447536..71ba93b9b 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -52,15 +52,12 @@ default_port => ?PGSQL_DEFAULT_PORT }). --type prepares() :: #{atom() => binary()}. --type params_tokens() :: #{atom() => list()}. - +-type template() :: {unicode:chardata(), emqx_connector_template_sql:row_template()}. -type state() :: #{ pool_name := binary(), - prepare_sql := prepares(), - params_tokens := params_tokens(), - prepare_statement := epgsql:statement() + query_templates := #{binary() => template()}, + prepares := #{binary() => epgsql:statement()} | {error, _} }. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' @@ -142,7 +139,7 @@ on_start( State = parse_prepare_sql(Config), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; + {ok, init_prepare(State#{pool_name => InstId, prepares => #{}})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -189,55 +186,50 @@ pgsql_query_type(_) -> on_batch_query( InstId, - BatchReq, - #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State + [{Key, _} = Request | _] = BatchReq, + #{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State ) -> - case BatchReq of - [{Key, _} = Request | _] -> - BinKey = to_bin(Key), - case maps:get(BinKey, Tokens, undefined) of - undefined -> - Log = #{ - connector => InstId, - first_request => Request, - state => State, - msg => "batch_prepare_not_implemented" - }, - ?SLOG(error, Log), - {error, {unrecoverable_error, batch_prepare_not_implemented}}; - TokenList -> - {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], - St = maps:get(BinKey, Sts), - case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of - {error, _Error} = Result -> - handle_result(Result); - {_Column, Results} -> - handle_batch_result(Results, 0) - end - end; - _ -> + BinKey = to_bin(Key), + case maps:get(BinKey, Templates, undefined) of + undefined -> Log = #{ connector => InstId, - request => BatchReq, + first_request => Request, state => State, - msg => "invalid_request" + msg => "batch prepare not implemented" }, ?SLOG(error, Log), - {error, {unrecoverable_error, invalid_request}} - end. + {error, {unrecoverable_error, batch_prepare_not_implemented}}; + {_Statement, RowTemplate} -> + PrepStatement = maps:get(BinKey, PrepStatements), + Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], + case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of + {error, _Error} = Result -> + handle_result(Result); + {_Column, Results} -> + handle_batch_result(Results, 0) + end + end; +on_batch_query(InstId, BatchReq, State) -> + ?SLOG(error, #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }), + {error, {unrecoverable_error, invalid_request}}. proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> +proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -> Key = to_bin(TypeOrKey), - case maps:get(Key, ParamsTokens, undefined) of + case maps:get(Key, Templates, undefined) of undefined -> {SQLOrData, Params}; - Tokens -> - {Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)} + {_Statement, RowTemplate} -> + {Key, render_prepare_sql_row(RowTemplate, SQLOrData)} end. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> @@ -297,9 +289,9 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; - {error, {undefined_table, NState}} -> + {error, undefined_table} -> %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target}; + {disconnected, State, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn connecting @@ -314,8 +306,8 @@ do_get_status(Conn) -> do_check_prepares( #{ pool_name := PoolName, - prepare_sql := #{<<"send_message">> := SQL} - } = State + query_templates := #{<<"send_message">> := {SQL, _RowTemplate}} + } ) -> WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], case validate_table_existence(WorkerPids, SQL) of @@ -324,19 +316,16 @@ do_check_prepares( {error, undefined_table} -> {error, {undefined_table, State}} end; -do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> +do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> ok; -do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> +do_check_prepares(#{prepares := {error, _}} = State) -> %% retry to prepare - case prepare_sql(Prepares, PoolName) of - {ok, Sts} -> + case prepare_sql(State) of + {ok, PrepStatements} -> %% remove the error - {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; - {error, undefined_table} -> - %% indicate the error - {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; - Error -> - {error, Error} + {ok, State#{prepares := PrepStatements}}; + {error, Reason} -> + {error, Reason} end. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}. @@ -426,69 +415,63 @@ conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). parse_prepare_sql(Config) -> - SQL = - case maps:get(prepare_statement, Config, undefined) of - undefined -> - case maps:get(sql, Config, undefined) of - undefined -> #{}; - Template -> #{<<"send_message">> => Template} - end; - Any -> - Any + Queries = + case Config of + #{prepare_statement := Qs} -> + Qs; + #{sql := Query} -> + #{<<"send_message">> => Query}; + #{} -> + #{} end, - parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), + #{query_templates => Templates}. -parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'), - parse_prepare_sql( - T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} - ); -parse_prepare_sql([], Prepares, Tokens) -> - #{ - prepare_sql => Prepares, - params_tokens => Tokens - }. +parse_prepare_sql(Key, Query, Acc) -> + Template = emqx_connector_template_sql:parse_prepstmt(Query, #{parameters => '$n'}), + Acc#{Key => Template}. -init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> - case maps:size(Prepares) of - 0 -> - State; - _ -> - case prepare_sql(Prepares, PoolName) of - {ok, Sts} -> - State#{prepare_statement := Sts}; - Error -> - LogMsg = - maps:merge( - #{msg => <<"postgresql_init_prepare_statement_failed">>}, - translate_to_log_context(Error) - ), - ?SLOG(error, LogMsg), - %% mark the prepare_sql as failed - State#{prepare_sql => {error, Prepares}} - end +render_prepare_sql_row(RowTemplate, Data) -> + % NOTE: ignoring errors here, missing variables will be replaced with `null`. + {Row, _Errors} = emqx_connector_template_sql:render_prepstmt(RowTemplate, Data), + Row. + +init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> + State; +init_prepare(State = #{}) -> + case prepare_sql(State) of + {ok, PrepStatements} -> + State#{prepares => PrepStatements}; + Error -> + ?SLOG(error, maps:merge( + #{msg => <<"postgresql_init_prepare_statement_failed">>}, + translate_to_log_context(Error) + )), + %% mark the prepares failed + State#{prepares => Error} end. -prepare_sql(Prepares, PoolName) when is_map(Prepares) -> - prepare_sql(maps:to_list(Prepares), PoolName); -prepare_sql(Prepares, PoolName) -> - case do_prepare_sql(Prepares, PoolName) of +prepare_sql(#{query_templates := Templates, pool_name := PoolName}) -> + prepare_sql(maps:to_list(Templates), PoolName). + +prepare_sql(Templates, PoolName) -> + case do_prepare_sql(Templates, PoolName) of {ok, _Sts} = Ok -> %% prepare for reconnect - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}), Ok; Error -> Error end. -do_prepare_sql(Prepares, PoolName) -> - do_prepare_sql(ecpool:workers(PoolName), Prepares, #{}). +do_prepare_sql(Templates, PoolName) -> + do_prepare_sql(ecpool:workers(PoolName), Templates, #{}). -do_prepare_sql([{_Name, Worker} | T], Prepares, _LastSts) -> +do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), - case prepare_sql_to_conn(Conn, Prepares) of + case prepare_sql_to_conn(Conn, Templates) of {ok, Sts} -> - do_prepare_sql(T, Prepares, Sts); + do_prepare_sql(Rest, Templates, Sts); Error -> Error end; @@ -498,13 +481,14 @@ do_prepare_sql([], _Prepares, LastSts) -> prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, Prepares, #{}). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> - LogMeta = #{msg => "postgresql_prepare_statement", name => Key, prepare_sql => SQL}, +prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> + {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> + LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> - prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); {error, {error, error, _, undefined_table, _, _} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}),