feat(postgres): add `disable_prepared_statements` option

Fixes https://emqx.atlassian.net/browse/EMQX-12496

Some Postgres connections, such ones made to [PGBouncer](https://www.pgbouncer.org/) or
[Supabase in Transaction Mode](https://supabase.com/), do not support some session
features like prepared statements.
This commit is contained in:
Thales Macedo Garitezi 2024-06-03 14:54:44 -03:00
parent b6ff67d712
commit c07bc68e6f
16 changed files with 179 additions and 38 deletions

View File

@ -606,6 +606,7 @@ pgsql_server() ->
pgsql_config() ->
#{
auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>,
username => <<"root">>,
password => <<"public">>,

View File

@ -426,6 +426,7 @@ setup_config(SpecialParams) ->
pgsql_config() ->
#{
auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>,
username => <<"root">>,
password => <<"public">>,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{applications, [
kernel,

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
schema_module/0,
connector_action_config_to_bridge_v1_config/2
]).
bridge_v1_type_name() -> matrix.
@ -20,3 +21,9 @@ action_type_name() -> matrix.
connector_type_name() -> matrix.
schema_module() -> emqx_bridge_matrix.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,

View File

@ -82,6 +82,7 @@ fields("get_bridge_v2") ->
fields("post_bridge_v2") ->
fields("post", pgsql, pgsql_action);
fields("config") ->
%% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
[
{enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql,
@ -95,8 +96,11 @@ fields("config") ->
#{desc => ?DESC("local_topic"), default => undefined}
)}
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_postgresql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
proplists:delete(
disable_prepared_statements,
emqx_postgresql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()
);
fields("post") ->
fields("post", ?ACTION_TYPE, "config");
fields("put") ->

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
schema_module/0,
connector_action_config_to_bridge_v1_config/2
]).
bridge_v1_type_name() -> pgsql.
@ -20,3 +21,20 @@ action_type_name() -> pgsql.
connector_type_name() -> pgsql.
schema_module() -> emqx_bridge_pgsql.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
),
maps:with(bridge_v1_fields(), Config0).
%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------
bridge_v1_fields() ->
[
emqx_utils_conv:bin(K)
|| {K, _V} <- emqx_bridge_pgsql:fields("config")
].

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, pgsql).
-define(BRIDGE_TYPE_BIN, <<"pgsql">>).
@ -33,7 +34,18 @@
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
All0 = emqx_common_test_helpers:all(?MODULE),
All = All0 -- matrix_cases(),
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
Groups ++ All.
matrix_cases() ->
[
t_disable_prepared_statements
].
groups() ->
emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
init_per_suite(Config) ->
PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
@ -80,10 +92,26 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
init_per_group(Group, Config) when
Group =:= postgres;
Group =:= timescale;
Group =:= matrix
->
[
{bridge_type, group_to_type(Group)},
{connector_type, group_to_type(Group)}
| Config
];
init_per_group(_Group, Config) ->
Config.
common_init_per_testcase(TestCase, Config) ->
group_to_type(postgres) -> pgsql;
group_to_type(Group) -> Group.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(),
@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) ->
BridgeConfig = bridge_config(Name, Name),
ok = snabbkaffe:start_trace(),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
{bridge_name, Name},
{bridge_config, BridgeConfig}
| NConfig
@ -232,3 +260,20 @@ t_sync_query(Config) ->
t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok.
t_disable_prepared_statements(matrix) ->
[[postgres], [timescale], [matrix]];
t_disable_prepared_statements(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
postgres_bridge_connector_on_query_return
),
ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource]},
{env, [

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
schema_module/0,
connector_action_config_to_bridge_v1_config/2
]).
bridge_v1_type_name() -> timescale.
@ -20,3 +21,9 @@ action_type_name() -> timescale.
connector_type_name() -> timescale.
schema_module() -> emqx_bridge_timescale.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,

View File

@ -50,6 +50,8 @@
execute_batch/3
]).
-export([disable_prepared_statements/0]).
%% for ecpool workers usage
-export([do_get_status/1, prepare_sql_to_conn/2]).
@ -62,7 +64,7 @@
#{
pool_name := binary(),
query_templates := #{binary() => template()},
prepares := #{binary() => epgsql:statement()} | {error, _}
prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
}.
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
@ -78,7 +80,10 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
[{server, server()}] ++
[
{server, server()},
disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields() ++
emqx_connector_schema_lib:prepare_statement_fields().
@ -87,6 +92,17 @@ server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
disable_prepared_statements() ->
{disable_prepared_statements,
hoconsc:mk(
boolean(),
#{
default => false,
required => false,
desc => ?DESC("disable_prepared_statements")
}
)}.
adjust_fields(Fields) ->
lists:map(
fun
@ -108,6 +124,7 @@ on_start(
InstId,
#{
server := Server,
disable_prepared_statements := DisablePreparedStatements,
database := DB,
username := User,
pool_size := PoolSize,
@ -143,11 +160,16 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
State1 = parse_prepare_sql(Config, <<"send_message">>),
State1 = parse_sql_template(Config, <<"send_message">>),
State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
Prepares =
case DisablePreparedStatements of
true -> disabled;
false -> #{}
end,
{ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@ -209,13 +231,17 @@ on_add_channel(
create_channel_state(
ChannelId,
#{pool_name := PoolName} = _ConnectorState,
#{
pool_name := PoolName,
prepares := Prepares
} = _ConnectorState,
#{parameters := Parameters} = _ChannelConfig
) ->
State1 = parse_prepare_sql(Parameters, ChannelId),
State1 = parse_sql_template(Parameters, ChannelId),
{ok,
init_prepare(State1#{
pool_name => PoolName,
prepares => Prepares,
prepare_statement => #{}
})}.
@ -233,6 +259,8 @@ on_remove_channel(
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
ok;
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_prepared_statement(WorkerPids, ChannelId, State),
@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% prepared statement doesn't exist.
try ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State),
Statement = get_templated_statement(ChannelId, State),
_ = epgsql:close(Conn, Statement),
close_prepared_statement(Rest, ChannelId, State);
_ ->
@ -303,21 +331,23 @@ on_query(
sql => NameOrSQL,
state => State
}),
Type = pgsql_query_type(TypeOrKey),
Type = pgsql_query_type(TypeOrKey, State),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
handle_result(Res).
pgsql_query_type(sql) ->
pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
query;
pgsql_query_type(query) ->
pgsql_query_type(sql, _ConnectorState) ->
query;
pgsql_query_type(prepared_query) ->
pgsql_query_type(query, _ConnectorState) ->
query;
pgsql_query_type(prepared_query, _ConnectorState) ->
prepared_query;
%% for bridge
pgsql_query_type(_) ->
pgsql_query_type(prepared_query).
pgsql_query_type(_, ConnectorState) ->
pgsql_query_type(prepared_query, ConnectorState).
on_batch_query(
InstId,
@ -336,9 +366,9 @@ on_batch_query(
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} ->
PrepStatement = get_prepared_statement(BinKey, State),
StatementTemplate = get_templated_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of
case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
{error, _Error} = Result ->
handle_result(Result);
{_Column, Results} ->
@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
BinKey = to_bin(TypeOrKey),
case get_template(BinKey, State) of
undefined ->
{SQLOrData, Params};
{_Statement, RowTemplate} ->
{BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
{Statement, RowTemplate} ->
Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
case DisablePreparedStatements of
true ->
{Statement, Rendered};
false ->
{BinKey, Rendered}
end
end.
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) ->
BinKey = to_bin(Key),
maps:get(BinKey, Templates, undefined).
get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
get_templated_statement(Key, #{installed_channels := Channels} = _State) when
is_map_key(Key, Channels)
->
BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels),
ChannelPreparedStatements = maps:get(prepares, ChannelState),
maps:get(BinKey, ChannelPreparedStatements);
get_prepared_statement(Key, #{prepares := PrepStatements}) ->
get_templated_statement(Key, #{prepares := PrepStatements}) ->
BinKey = to_bin(Key),
maps:get(BinKey, PrepStatements).
@ -480,6 +517,8 @@ do_check_prepares(
{error, Reason} ->
{error, Reason}
end;
do_check_prepares(#{prepares := disabled}) ->
ok;
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(#{prepares := {error, _}} = State) ->
@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
parse_prepare_sql(Config, SQLID) ->
parse_sql_template(Config, SQLID) ->
Queries =
case Config of
#{prepare_statement := Qs} ->
@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) ->
#{} ->
#{}
end,
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
#{query_templates => Templates}.
parse_prepare_sql(Key, Query, Acc) ->
parse_sql_template(Key, Query, Acc) ->
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
Acc#{Key => Template}.
@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) ->
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
Row.
init_prepare(State = #{prepares := disabled}) ->
State;
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
State;
init_prepare(State = #{}) ->

View File

@ -47,7 +47,10 @@ roots() ->
[].
fields("connection_fields") ->
[{server, server()}] ++
[
{server, server()},
emqx_postgresql:disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields();
fields("config_connector") ->

View File

@ -0,0 +1,3 @@
Added the `disable_prepared_statements` option for Postgres-based connectors.
This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode.

View File

@ -14,4 +14,13 @@ config_connector.desc:
config_connector.label:
"""PostgreSQL Connector Config"""
disable_prepared_statements.label:
"""Disable Prepared Statements"""
disable_prepared_statements.desc:
"""~
Disables the usage of prepared statements in the connections.
Some endpoints, like PGBouncer or Supabase in Transaction mode, do not
support session features such as prepared statements. For such connections,
this option should be enabled.~"""
}

View File

@ -49,6 +49,7 @@ NIF
OCSP
OTP
PEM
PGBouncer
PINGREQ
PSK
PSK
@ -65,6 +66,7 @@ Riak
SHA
SMS
Struct
Supabase
TCP
TLS
TTL