feat(tpl): switch basic connectors to `emqx_connector_template`

Also avoid `filename:join/2` in HTTP connector since it's both OS specific
and an overkill.
This commit is contained in:
Andrew Mayorov 2023-04-18 15:34:38 +03:00
parent 28d55d72ca
commit 35902dc72d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
6 changed files with 280 additions and 324 deletions

View File

@ -479,61 +479,47 @@ preprocess_request(
} = Req
) ->
#{
method => emqx_placeholder:preproc_tmpl(to_bin(Method)),
path => emqx_placeholder:preproc_tmpl(Path),
body => maybe_preproc_tmpl(body, Req),
headers => wrap_auth_header(preproc_headers(Headers)),
method => parse_template(to_bin(Method)),
path => parse_template(Path),
body => maybe_parse_template(body, Req),
headers => parse_headers(Headers),
request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
max_retries => maps:get(max_retries, Req, 2)
}.
preproc_headers(Headers) when is_map(Headers) ->
parse_headers(Headers) when is_map(Headers) ->
maps:fold(
fun(K, V, Acc) ->
[
{
emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_placeholder:preproc_tmpl(to_bin(V))
}
| Acc
]
end,
fun(K, V, Acc) -> [parse_header(K, V) | Acc] end,
[],
Headers
);
preproc_headers(Headers) when is_list(Headers) ->
parse_headers(Headers) when is_list(Headers) ->
lists:map(
fun({K, V}) ->
{
emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_placeholder:preproc_tmpl(to_bin(V))
}
end,
fun({K, V}) -> parse_header(K, V) end,
Headers
).
wrap_auth_header(Headers) ->
lists:map(fun maybe_wrap_auth_header/1, Headers).
parse_header(K, V) ->
KStr = to_bin(K),
VTpl = parse_template(to_bin(V)),
{parse_template(KStr), maybe_wrap_auth_header(KStr, VTpl)}.
maybe_wrap_auth_header({[{str, Key}] = StrKey, Val}) ->
{_, MaybeWrapped} = maybe_wrap_auth_header({Key, Val}),
{StrKey, MaybeWrapped};
maybe_wrap_auth_header({Key, Val} = Header) when
is_binary(Key), (size(Key) =:= 19 orelse size(Key) =:= 13)
maybe_wrap_auth_header(Key, VTpl) when
(byte_size(Key) =:= 19 orelse byte_size(Key) =:= 13)
->
%% We check the size of potential keys in the guard above and consider only
%% those that match the number of characters of either "Authorization" or
%% "Proxy-Authorization".
case try_bin_to_lower(Key) of
<<"authorization">> ->
{Key, emqx_secret:wrap(Val)};
emqx_secret:wrap(VTpl);
<<"proxy-authorization">> ->
{Key, emqx_secret:wrap(Val)};
emqx_secret:wrap(VTpl);
_Other ->
Header
VTpl
end;
maybe_wrap_auth_header(Header) ->
Header.
maybe_wrap_auth_header(_Key, VTpl) ->
VTpl.
try_bin_to_lower(Bin) ->
try iolist_to_binary(string:lowercase(Bin)) of
@ -542,46 +528,57 @@ try_bin_to_lower(Bin) ->
_:_ -> Bin
end.
maybe_preproc_tmpl(Key, Conf) ->
maybe_parse_template(Key, Conf) ->
case maps:get(Key, Conf, undefined) of
undefined -> undefined;
Val -> emqx_placeholder:preproc_tmpl(Val)
Val -> parse_template(Val)
end.
parse_template(String) ->
emqx_connector_template:parse(String).
process_request(
#{
method := MethodTks,
path := PathTks,
body := BodyTks,
headers := HeadersTks,
method := MethodTemplate,
path := PathTemplate,
body := BodyTemplate,
headers := HeadersTemplate,
request_timeout := ReqTimeout
} = Conf,
Msg
) ->
Conf#{
method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)),
path => emqx_placeholder:proc_tmpl(PathTks, Msg),
body => process_request_body(BodyTks, Msg),
headers => proc_headers(HeadersTks, Msg),
method => make_method(render_template_string(MethodTemplate, Msg)),
path => unicode:characters_to_list(render_template(PathTemplate, Msg)),
body => render_request_body(BodyTemplate, Msg),
headers => render_headers(HeadersTemplate, Msg),
request_timeout => ReqTimeout
}.
process_request_body(undefined, Msg) ->
render_request_body(undefined, Msg) ->
emqx_utils_json:encode(Msg);
process_request_body(BodyTks, Msg) ->
emqx_placeholder:proc_tmpl(BodyTks, Msg).
render_request_body(BodyTks, Msg) ->
render_template(BodyTks, Msg).
proc_headers(HeaderTks, Msg) ->
render_headers(HeaderTks, Msg) ->
lists:map(
fun({K, V}) ->
{
emqx_placeholder:proc_tmpl(K, Msg),
emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg)
render_template_string(K, Msg),
render_template_string(emqx_secret:unwrap(V), Msg)
}
end,
HeaderTks
).
render_template(Template, Msg) ->
% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{String, _Errors} = emqx_connector_template:render(Template, Msg),
String.
render_template_string(Template, Msg) ->
unicode:characters_to_binary(render_template(Template, Msg)).
make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
@ -716,8 +713,6 @@ maybe_retry(Result, _Context, ReplyFunAndArgs) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% The HOCON schema system may generate sensitive keys with this format
is_sensitive_key([{str, StringKey}]) ->
is_sensitive_key(StringKey);
is_sensitive_key(Atom) when is_atom(Atom) ->
is_sensitive_key(erlang:atom_to_binary(Atom));
is_sensitive_key(Bin) when is_binary(Bin), (size(Bin) =:= 19 orelse size(Bin) =:= 13) ->
@ -742,25 +737,19 @@ redact(Data) ->
%% and we also can't know the body format and where the sensitive data will be
%% so the easy way to keep data security is redacted the whole body
redact_request({Path, Headers}) ->
{Path, redact(Headers)};
{Path, Headers};
redact_request({Path, Headers, _Body}) ->
{Path, redact(Headers), <<"******">>}.
{Path, Headers, <<"******">>}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
redact_test_() ->
TestData1 = [
{<<"content-type">>, <<"application/json">>},
{<<"Authorization">>, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>}
],
TestData2 = #{
headers =>
[
{[{str, <<"content-type">>}], [{str, <<"application/json">>}]},
{[{str, <<"Authorization">>}], [{str, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>}]}
]
TestData = #{
headers => [
{<<"content-type">>, <<"application/json">>},
{<<"Authorization">>, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>}
]
},
[
?_assert(is_sensitive_key(<<"Authorization">>)),
@ -770,8 +759,7 @@ redact_test_() ->
?_assert(is_sensitive_key('PrOxy-authoRizaTion')),
?_assertNot(is_sensitive_key(<<"Something">>)),
?_assertNot(is_sensitive_key(89)),
?_assertNotEqual(TestData1, redact(TestData1)),
?_assertNotEqual(TestData2, redact(TestData2))
?_assertNotEqual(TestData, redact(TestData))
].
join_paths_test_() ->

View File

@ -83,7 +83,8 @@ is_wrapped(Secret) when is_function(Secret) ->
is_wrapped(_Other) ->
false.
untmpl([{_, V} | _]) -> V.
untmpl(Tpl) ->
iolist_to_binary(emqx_connector_template:render_strict(Tpl, #{})).
is_unwrapped_headers(Headers) ->
lists:all(fun is_unwrapped_header/1, Headers).

View File

@ -565,8 +565,6 @@ t_simple_sql_query(Config) ->
ok.
t_missing_data(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
?assertMatch(
{ok, _},
create_bridge(Config)
@ -577,27 +575,13 @@ t_missing_data(Config) ->
),
send_message(Config, #{}),
{ok, [Event]} = snabbkaffe:receive_events(SRef),
case IsBatch of
true ->
?assertMatch(
#{
result :=
{error,
{unrecoverable_error,
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}
},
Event
);
false ->
?assertMatch(
#{
result :=
{error,
{unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
},
Event
)
end,
?assertMatch(
#{
result :=
{error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
},
Event
),
ok.
t_bad_sql_parameter(Config) ->

View File

@ -324,6 +324,7 @@ connect_and_drop_table(Config) ->
connect_and_clear_table(Config) ->
Con = connect_direct_pgsql(Config),
_ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
{ok, _} = epgsql:squery(Con, ?SQL_DELETE),
ok = epgsql:close(Con).
@ -668,7 +669,7 @@ t_missing_table(Config) ->
ok
end,
fun(Trace) ->
?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),

View File

@ -46,16 +46,12 @@
default_port => ?MYSQL_DEFAULT_PORT
}).
-type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}.
-type sqls() :: #{atom() => binary()}.
-type template() :: {unicode:chardata(), emqx_connector_template:str()}.
-type state() ::
#{
pool_name := binary(),
prepare_statement := prepares(),
params_tokens := params_tokens(),
batch_inserts := sqls(),
batch_params_tokens := params_tokens()
prepares := ok | {error, _},
templates := #{{atom(), batch | prepstmt} => template()}
}.
%%=====================================================================
@ -154,13 +150,13 @@ on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
on_query(
InstId,
{TypeOrKey, SQLOrKey, Params, Timeout},
#{pool_name := PoolName, prepare_statement := Prepares} = State
State
) ->
MySqlFunction = mysql_function(TypeOrKey),
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
{error, not_prepared} ->
case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of
case maybe_prepare_sql(SQLOrKey2, State) of
ok ->
?tp(
mysql_connector_on_query_prepared_sql,
@ -187,23 +183,27 @@ on_query(
on_batch_query(
InstId,
BatchReq,
#{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
BatchReq = [{Key, _} | _],
#{query_templates := Templates} = State
) ->
case hd(BatchReq) of
{Key, _} ->
case maps:get(Key, Inserts, undefined) of
undefined ->
{error, {unrecoverable_error, batch_select_not_implemented}};
InsertSQL ->
Tokens = maps:get(Key, ParamsTokens),
on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
end;
Request ->
LogMeta = #{connector => InstId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, {unrecoverable_error, invalid_request}}
end.
case maps:get({Key, batch}, Templates, undefined) of
undefined ->
{error, {unrecoverable_error, batch_select_not_implemented}};
Template ->
on_batch_insert(InstId, BatchReq, Template, State)
end;
on_batch_query(
InstId,
BatchReq,
State
) ->
?SLOG(error, #{
msg => "invalid request",
connector => InstId,
request => BatchReq,
state => State
}),
{error, {unrecoverable_error, invalid_request}}.
mysql_function(sql) ->
query;
@ -222,8 +222,8 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
{error, {undefined_table, NState}} ->
{disconnected, NState, unhealthy_target};
{error, undefined_table} ->
{disconnected, State, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
@ -238,8 +238,8 @@ do_get_status(Conn) ->
do_check_prepares(
#{
pool_name := PoolName,
prepare_statement := #{send_message := SQL}
} = State
templates := #{{send_message, prepstmt} := SQL}
}
) ->
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
@ -250,7 +250,7 @@ do_check_prepares(
{ok, Conn} ->
case mysql:prepare(Conn, get_status, SQL) of
{error, {1146, _, _}} ->
{error, {undefined_table, State}};
{error, undefined_table};
{ok, Statement} ->
mysql:unprepare(Conn, Statement);
_ ->
@ -265,17 +265,14 @@ do_check_prepares(
ok,
Workers
);
do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) ->
do_check_prepares(#{prepares := ok}) ->
ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
do_check_prepares(#{prepares := {error, _}} = State) ->
%% retry to prepare
case prepare_sql(Prepares, PoolName) of
case prepare_sql(State) of
ok ->
%% remove the error
{ok, State#{prepare_statement => Prepares}};
{error, undefined_table} ->
%% indicate the error
{error, {undefined_table, State#{prepare_statement => {error, Prepares}}}};
{ok, State#{prepares => ok}};
{error, Reason} ->
{error, Reason}
end.
@ -285,41 +282,44 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error,
connect(Options) ->
mysql:start_link(Options).
init_prepare(State = #{prepare_statement := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
init_prepare(State = #{query_templates := Templates}) ->
case maps:size(Templates) of
0 ->
State;
State#{prepares => ok};
_ ->
case prepare_sql(Prepares, PoolName) of
case prepare_sql(State) of
ok ->
State;
State#{prepares => ok};
{error, Reason} ->
LogMeta = #{msg => <<"mysql_init_prepare_statement_failed">>, reason => Reason},
?SLOG(error, LogMeta),
?SLOG(error, #{
msg => <<"MySQL init prepare statement failed">>,
reason => Reason
}),
%% mark the prepare_statement as failed
State#{prepare_statement => {error, Prepares}}
State#{prepares => {error, Reason}}
end
end.
maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
case maps:is_key(SQLOrKey, Prepares) of
true -> prepare_sql(Prepares, PoolName);
maybe_prepare_sql(SQLOrKey, State = #{query_templates := Templates}) ->
case maps:is_key({SQLOrKey, prepstmt}, Templates) of
true -> prepare_sql(State);
false -> {error, {unrecoverable_error, prepared_statement_invalid}}
end.
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName);
prepare_sql(Prepares, PoolName) ->
case do_prepare_sql(Prepares, PoolName) of
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
prepare_sql(maps:to_list(Templates), PoolName).
prepare_sql(Templates, PoolName) ->
case do_prepare_sql(Templates, PoolName) of
ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
ok;
{error, R} ->
{error, R}
end.
do_prepare_sql(Prepares, PoolName) ->
do_prepare_sql(Templates, PoolName) ->
Conns =
[
begin
@ -328,33 +328,30 @@ do_prepare_sql(Prepares, PoolName) ->
end
|| {_Name, Worker} <- ecpool:workers(PoolName)
],
prepare_sql_to_conn_list(Conns, Prepares).
prepare_sql_to_conn_list(Conns, Templates).
prepare_sql_to_conn_list([], _PrepareList) ->
prepare_sql_to_conn_list([], _Templates) ->
ok;
prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
case prepare_sql_to_conn(Conn, PrepareList) of
prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
case prepare_sql_to_conn(Conn, Templates) of
ok ->
prepare_sql_to_conn_list(ConnList, PrepareList);
prepare_sql_to_conn_list(ConnList, Templates);
{error, R} ->
%% rollback
Fun = fun({Key, _}) ->
_ = unprepare_sql_to_conn(Conn, Key),
ok
end,
lists:foreach(Fun, PrepareList),
_ = [unprepare_sql_to_conn(Conn, Template) || Template <- Templates],
{error, R}
end.
prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok;
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
LogMeta = #{msg => "mysql_prepare_statement", name => Key, prepare_sql => SQL},
prepare_sql_to_conn(_Conn, []) ->
ok;
prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL},
?SLOG(info, LogMeta),
_ = unprepare_sql_to_conn(Conn, Key),
case mysql:prepare(Conn, Key, SQL) of
{ok, _Key} ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList);
prepare_sql_to_conn(Conn, Rest);
{error, {1146, _, _} = Reason} ->
%% Target table is not created
?tp(mysql_undefined_table, #{}),
@ -365,84 +362,85 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
% syntax failures. Retrying syntax failures is not very productive.
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, Reason}
end.
end;
prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
prepare_sql_to_conn(Conn, Rest).
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
mysql:unprepare(Conn, PrepareSqlKey).
unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
mysql:unprepare(Conn, Key);
unprepare_sql_to_conn(Conn, Key) when is_atom(Key) ->
mysql:unprepare(Conn, Key);
unprepare_sql_to_conn(_Conn, _) ->
ok.
parse_prepare_sql(Config) ->
SQL =
case maps:get(prepare_statement, Config, undefined) of
undefined ->
case maps:get(sql, Config, undefined) of
undefined -> #{};
Template -> #{send_message => Template}
end;
Any ->
Any
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
#{send_message => Query};
_ ->
#{}
end,
parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
#{query_templates => Templates}.
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H),
parse_batch_prepare_sql(
L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
);
parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
#{
prepare_statement => Prepares,
params_tokens => Tokens,
batch_inserts => BatchInserts,
batch_params_tokens => BatchTks
}.
parse_prepare_sql(Key, Query, Acc) ->
Template = emqx_connector_template_sql:parse_prepstmt(Query, #{parameters => '?'}),
AccNext = Acc#{{Key, prepstmt} => Template},
parse_batch_sql(Key, Query, AccNext).
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
case emqx_utils_sql:get_statement_type(H) of
select ->
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
parse_batch_sql(Key, Query, Acc) ->
case emqx_connector_sql:get_statement_type(Query) of
insert ->
case emqx_utils_sql:parse_insert(H) of
{ok, {InsertSQL, Params}} ->
ParamsTks = emqx_placeholder:preproc_tmpl(Params),
parse_prepare_sql(
T,
Prepares,
Tokens,
BatchInserts#{Key => InsertSQL},
BatchTks#{Key => ParamsTks}
);
case emqx_connector_sql:parse_insert(Query) of
{ok, {Insert, Params}} ->
RowTemplate = emqx_connector_template_sql:parse(Params),
Acc#{{Key, batch} => {Insert, RowTemplate}};
{error, Reason} ->
?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
?SLOG(error, #{
msg => "parse insert sql statement failed",
sql => Query,
reason => Reason
}),
Acc
end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
{error, Reason} ->
?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
select ->
Acc;
Otherwise ->
?SLOG(error, #{
msg => "invalid sql statement type",
sql => Query,
type => Otherwise
}),
Acc
end.
proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
case maps:get(TypeOrKey, ParamsTokens, undefined) of
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
case maps:get({TypeOrKey, prepstmt}, Templates, undefined) of
undefined ->
{SQLOrData, Params};
Tokens ->
{TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
{_InsertPart, RowTemplate} ->
% NOTE: ignoring errors here, missing variables are set to `null`.
{Row, _Errors} = emqx_connector_template_sql:render_prepstmt(RowTemplate, SQLOrData),
{TypeOrKey, Row}
end.
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
ValuesPart = lists:join($,, [
emqx_placeholder:proc_param_str(Tokens, Msg, fun emqx_placeholder:quote_mysql/1)
|| {_, Msg} <- BatchReqs
]),
Query = [InsertPart, <<" values ">> | ValuesPart],
on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],
Query = [InsertPart, <<" values ">> | lists:join($,, Rows)],
on_sql_query(InstId, query, Query, no_params, default_timeout, State).
render_row(RowTemplate, Data) ->
% NOTE: ignoring errors here, missing variables are set to "NULL".
{Row, _Errors} = emqx_connector_template_sql:render(RowTemplate, Data, #{escaping => mysql}),
Row.
on_sql_query(
InstId,
SQLFunc,

View File

@ -52,15 +52,12 @@
default_port => ?PGSQL_DEFAULT_PORT
}).
-type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}.
-type template() :: {unicode:chardata(), emqx_connector_template_sql:row_template()}.
-type state() ::
#{
pool_name := binary(),
prepare_sql := prepares(),
params_tokens := params_tokens(),
prepare_statement := epgsql:statement()
query_templates := #{binary() => template()},
prepares := #{binary() => epgsql:statement()} | {error, _}
}.
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
@ -142,7 +139,7 @@ on_start(
State = parse_prepare_sql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
{ok, init_prepare(State#{pool_name => InstId, prepares => #{}})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@ -189,55 +186,50 @@ pgsql_query_type(_) ->
on_batch_query(
InstId,
BatchReq,
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
[{Key, _} = Request | _] = BatchReq,
#{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State
) ->
case BatchReq of
[{Key, _} = Request | _] ->
BinKey = to_bin(Key),
case maps:get(BinKey, Tokens, undefined) of
undefined ->
Log = #{
connector => InstId,
first_request => Request,
state => State,
msg => "batch_prepare_not_implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
{error, _Error} = Result ->
handle_result(Result);
{_Column, Results} ->
handle_batch_result(Results, 0)
end
end;
_ ->
BinKey = to_bin(Key),
case maps:get(BinKey, Templates, undefined) of
undefined ->
Log = #{
connector => InstId,
request => BatchReq,
first_request => Request,
state => State,
msg => "invalid_request"
msg => "batch prepare not implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, invalid_request}}
end.
{error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} ->
PrepStatement = maps:get(BinKey, PrepStatements),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
{error, _Error} = Result ->
handle_result(Result);
{_Column, Results} ->
handle_batch_result(Results, 0)
end
end;
on_batch_query(InstId, BatchReq, State) ->
?SLOG(error, #{
connector => InstId,
request => BatchReq,
state => State,
msg => "invalid request"
}),
{error, {unrecoverable_error, invalid_request}}.
proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
Key = to_bin(TypeOrKey),
case maps:get(Key, ParamsTokens, undefined) of
case maps:get(Key, Templates, undefined) of
undefined ->
{SQLOrData, Params};
Tokens ->
{Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
{_Statement, RowTemplate} ->
{Key, render_prepare_sql_row(RowTemplate, SQLOrData)}
end.
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
@ -297,9 +289,9 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
{error, {undefined_table, NState}} ->
{error, undefined_table} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target};
{disconnected, State, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
@ -314,8 +306,8 @@ do_get_status(Conn) ->
do_check_prepares(
#{
pool_name := PoolName,
prepare_sql := #{<<"send_message">> := SQL}
} = State
query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
}
) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
case validate_table_existence(WorkerPids, SQL) of
@ -324,19 +316,16 @@ do_check_prepares(
{error, undefined_table} ->
{error, {undefined_table, State}}
end;
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
do_check_prepares(#{prepares := {error, _}} = State) ->
%% retry to prepare
case prepare_sql(Prepares, PoolName) of
{ok, Sts} ->
case prepare_sql(State) of
{ok, PrepStatements} ->
%% remove the error
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
{error, undefined_table} ->
%% indicate the error
{error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
Error ->
{error, Error}
{ok, State#{prepares := PrepStatements}};
{error, Reason} ->
{error, Reason}
end.
-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
@ -426,69 +415,63 @@ conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
parse_prepare_sql(Config) ->
SQL =
case maps:get(prepare_statement, Config, undefined) of
undefined ->
case maps:get(sql, Config, undefined) of
undefined -> #{};
Template -> #{<<"send_message">> => Template}
end;
Any ->
Any
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
#{<<"send_message">> => Query};
#{} ->
#{}
end,
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
#{query_templates => Templates}.
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'),
parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);
parse_prepare_sql([], Prepares, Tokens) ->
#{
prepare_sql => Prepares,
params_tokens => Tokens
}.
parse_prepare_sql(Key, Query, Acc) ->
Template = emqx_connector_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
Acc#{Key => Template}.
init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
0 ->
State;
_ ->
case prepare_sql(Prepares, PoolName) of
{ok, Sts} ->
State#{prepare_statement := Sts};
Error ->
LogMsg =
maps:merge(
#{msg => <<"postgresql_init_prepare_statement_failed">>},
translate_to_log_context(Error)
),
?SLOG(error, LogMsg),
%% mark the prepare_sql as failed
State#{prepare_sql => {error, Prepares}}
end
render_prepare_sql_row(RowTemplate, Data) ->
% NOTE: ignoring errors here, missing variables will be replaced with `null`.
{Row, _Errors} = emqx_connector_template_sql:render_prepstmt(RowTemplate, Data),
Row.
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
State;
init_prepare(State = #{}) ->
case prepare_sql(State) of
{ok, PrepStatements} ->
State#{prepares => PrepStatements};
Error ->
?SLOG(error, maps:merge(
#{msg => <<"postgresql_init_prepare_statement_failed">>},
translate_to_log_context(Error)
)),
%% mark the prepares failed
State#{prepares => Error}
end.
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName);
prepare_sql(Prepares, PoolName) ->
case do_prepare_sql(Prepares, PoolName) of
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
prepare_sql(maps:to_list(Templates), PoolName).
prepare_sql(Templates, PoolName) ->
case do_prepare_sql(Templates, PoolName) of
{ok, _Sts} = Ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
Ok;
Error ->
Error
end.
do_prepare_sql(Prepares, PoolName) ->
do_prepare_sql(ecpool:workers(PoolName), Prepares, #{}).
do_prepare_sql(Templates, PoolName) ->
do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
do_prepare_sql([{_Name, Worker} | T], Prepares, _LastSts) ->
do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker),
case prepare_sql_to_conn(Conn, Prepares) of
case prepare_sql_to_conn(Conn, Templates) of
{ok, Sts} ->
do_prepare_sql(T, Prepares, Sts);
do_prepare_sql(Rest, Templates, Sts);
Error ->
Error
end;
@ -498,13 +481,14 @@ do_prepare_sql([], _Prepares, LastSts) ->
prepare_sql_to_conn(Conn, Prepares) ->
prepare_sql_to_conn(Conn, Prepares, #{}).
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
LogMeta = #{msg => "postgresql_prepare_statement", name => Key, prepare_sql => SQL},
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
{ok, Statements};
prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, sql => SQL},
?SLOG(info, LogMeta),
case epgsql:parse2(Conn, Key, SQL, []) of
{ok, Statement} ->
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement});
{error, {error, error, _, undefined_table, _, _} = Error} ->
%% Target table is not created
?tp(pgsql_undefined_table, #{}),