Merge pull request #13175 from thalesmg/disable-prepared-statements-postgres-r57-20240603

feat(postgres): add `disable_prepared_statements` option
This commit is contained in:
Thales Macedo Garitezi 2024-06-04 12:29:53 -03:00 committed by GitHub
commit 5eab221f7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 179 additions and 38 deletions

View File

@ -606,6 +606,7 @@ pgsql_server() ->
pgsql_config() -> pgsql_config() ->
#{ #{
auto_reconnect => true, auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>, database => <<"mqtt">>,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"public">>,

View File

@ -426,6 +426,7 @@ setup_config(SpecialParams) ->
pgsql_config() -> pgsql_config() ->
#{ #{
auto_reconnect => true, auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>, database => <<"mqtt">>,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"public">>,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_matrix, [ {application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"}, {description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> matrix. bridge_v1_type_name() -> matrix.
@ -20,3 +21,9 @@ action_type_name() -> matrix.
connector_type_name() -> matrix. connector_type_name() -> matrix.
schema_module() -> emqx_bridge_matrix. schema_module() -> emqx_bridge_matrix.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [ {application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"}, {description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -82,6 +82,7 @@ fields("get_bridge_v2") ->
fields("post_bridge_v2") -> fields("post_bridge_v2") ->
fields("post", pgsql, pgsql_action); fields("post", pgsql, pgsql_action);
fields("config") -> fields("config") ->
%% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
[ [
{enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql, {sql,
@ -95,8 +96,11 @@ fields("config") ->
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)} )}
] ++ emqx_resource_schema:fields("resource_opts") ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_postgresql:fields(config) -- proplists:delete(
emqx_connector_schema_lib:prepare_statement_fields()); disable_prepared_statements,
emqx_postgresql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()
);
fields("post") -> fields("post") ->
fields("post", ?ACTION_TYPE, "config"); fields("post", ?ACTION_TYPE, "config");
fields("put") -> fields("put") ->

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> pgsql. bridge_v1_type_name() -> pgsql.
@ -20,3 +21,20 @@ action_type_name() -> pgsql.
connector_type_name() -> pgsql. connector_type_name() -> pgsql.
schema_module() -> emqx_bridge_pgsql. schema_module() -> emqx_bridge_pgsql.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
),
maps:with(bridge_v1_fields(), Config0).
%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------
bridge_v1_fields() ->
[
emqx_utils_conv:bin(K)
|| {K, _V} <- emqx_bridge_pgsql:fields("config")
].

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, pgsql). -define(BRIDGE_TYPE, pgsql).
-define(BRIDGE_TYPE_BIN, <<"pgsql">>). -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
@ -33,7 +34,18 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). All0 = emqx_common_test_helpers:all(?MODULE),
All = All0 -- matrix_cases(),
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
Groups ++ All.
matrix_cases() ->
[
t_disable_prepared_statements
].
groups() ->
emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
init_per_suite(Config) -> init_per_suite(Config) ->
PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
@ -80,10 +92,26 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps), emqx_cth_suite:stop(Apps),
ok. ok.
init_per_testcase(TestCase, Config) -> init_per_group(Group, Config) when
common_init_per_testcase(TestCase, Config). Group =:= postgres;
Group =:= timescale;
Group =:= matrix
->
[
{bridge_type, group_to_type(Group)},
{connector_type, group_to_type(Group)}
| Config
];
init_per_group(_Group, Config) ->
Config.
common_init_per_testcase(TestCase, Config) -> group_to_type(postgres) -> pgsql;
group_to_type(Group) -> Group.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)), ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(), emqx_config:delete_override_conf_files(),
@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) ->
BridgeConfig = bridge_config(Name, Name), BridgeConfig = bridge_config(Name, Name),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[ [
{connector_type, ?CONNECTOR_TYPE}, {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
{connector_name, Name}, {connector_name, Name},
{connector_config, ConnectorConfig}, {connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE}, {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
{bridge_name, Name}, {bridge_name, Name},
{bridge_config, BridgeConfig} {bridge_config, BridgeConfig}
| NConfig | NConfig
@ -232,3 +260,20 @@ t_sync_query(Config) ->
t_start_action_or_source_with_disabled_connector(Config) -> t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok. ok.
t_disable_prepared_statements(matrix) ->
[[postgres], [timescale], [matrix]];
t_disable_prepared_statements(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
postgres_bridge_connector_on_query_return
),
ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_timescale, [ {application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"}, {description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource]}, {applications, [kernel, stdlib, emqx_resource]},
{env, [ {env, [

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> timescale. bridge_v1_type_name() -> timescale.
@ -20,3 +21,9 @@ action_type_name() -> timescale.
connector_type_name() -> timescale. connector_type_name() -> timescale.
schema_module() -> emqx_bridge_timescale. schema_module() -> emqx_bridge_timescale.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -50,6 +50,8 @@
execute_batch/3 execute_batch/3
]). ]).
-export([disable_prepared_statements/0]).
%% for ecpool workers usage %% for ecpool workers usage
-export([do_get_status/1, prepare_sql_to_conn/2]). -export([do_get_status/1, prepare_sql_to_conn/2]).
@ -62,7 +64,7 @@
#{ #{
pool_name := binary(), pool_name := binary(),
query_templates := #{binary() => template()}, query_templates := #{binary() => template()},
prepares := #{binary() => epgsql:statement()} | {error, _} prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
}. }.
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
@ -78,7 +80,10 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}]. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) -> fields(config) ->
[{server, server()}] ++ [
{server, server()},
disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:ssl_fields() ++
emqx_connector_schema_lib:prepare_statement_fields(). emqx_connector_schema_lib:prepare_statement_fields().
@ -87,6 +92,17 @@ server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
disable_prepared_statements() ->
{disable_prepared_statements,
hoconsc:mk(
boolean(),
#{
default => false,
required => false,
desc => ?DESC("disable_prepared_statements")
}
)}.
adjust_fields(Fields) -> adjust_fields(Fields) ->
lists:map( lists:map(
fun fun
@ -108,6 +124,7 @@ on_start(
InstId, InstId,
#{ #{
server := Server, server := Server,
disable_prepared_statements := DisablePreparedStatements,
database := DB, database := DB,
username := User, username := User,
pool_size := PoolSize, pool_size := PoolSize,
@ -143,11 +160,16 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
State1 = parse_prepare_sql(Config, <<"send_message">>), State1 = parse_sql_template(Config, <<"send_message">>),
State2 = State1#{installed_channels => #{}}, State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; Prepares =
case DisablePreparedStatements of
true -> disabled;
false -> #{}
end,
{ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
pgsql_connector_start_failed, pgsql_connector_start_failed,
@ -209,13 +231,17 @@ on_add_channel(
create_channel_state( create_channel_state(
ChannelId, ChannelId,
#{pool_name := PoolName} = _ConnectorState, #{
pool_name := PoolName,
prepares := Prepares
} = _ConnectorState,
#{parameters := Parameters} = _ChannelConfig #{parameters := Parameters} = _ChannelConfig
) -> ) ->
State1 = parse_prepare_sql(Parameters, ChannelId), State1 = parse_sql_template(Parameters, ChannelId),
{ok, {ok,
init_prepare(State1#{ init_prepare(State1#{
pool_name => PoolName, pool_name => PoolName,
prepares => Prepares,
prepare_statement => #{} prepare_statement => #{}
})}. })}.
@ -233,6 +259,8 @@ on_remove_channel(
NewState = OldState#{installed_channels => NewInstalledChannels}, NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}. {ok, NewState}.
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
ok;
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_prepared_statement(WorkerPids, ChannelId, State), close_prepared_statement(WorkerPids, ChannelId, State),
@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% prepared statement doesn't exist. %% prepared statement doesn't exist.
try ecpool_worker:client(WorkerPid) of try ecpool_worker:client(WorkerPid) of
{ok, Conn} -> {ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State), Statement = get_templated_statement(ChannelId, State),
_ = epgsql:close(Conn, Statement), _ = epgsql:close(Conn, Statement),
close_prepared_statement(Rest, ChannelId, State); close_prepared_statement(Rest, ChannelId, State);
_ -> _ ->
@ -303,21 +331,23 @@ on_query(
sql => NameOrSQL, sql => NameOrSQL,
state => State state => State
}), }),
Type = pgsql_query_type(TypeOrKey), Type = pgsql_query_type(TypeOrKey, State),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
handle_result(Res). handle_result(Res).
pgsql_query_type(sql) -> pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
query; query;
pgsql_query_type(query) -> pgsql_query_type(sql, _ConnectorState) ->
query; query;
pgsql_query_type(prepared_query) -> pgsql_query_type(query, _ConnectorState) ->
query;
pgsql_query_type(prepared_query, _ConnectorState) ->
prepared_query; prepared_query;
%% for bridge %% for bridge
pgsql_query_type(_) -> pgsql_query_type(_, ConnectorState) ->
pgsql_query_type(prepared_query). pgsql_query_type(prepared_query, ConnectorState).
on_batch_query( on_batch_query(
InstId, InstId,
@ -336,9 +366,9 @@ on_batch_query(
?SLOG(error, Log), ?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}}; {error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} -> {_Statement, RowTemplate} ->
PrepStatement = get_prepared_statement(BinKey, State), StatementTemplate = get_templated_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
{error, _Error} = Result -> {error, _Error} = Result ->
handle_result(Result); handle_result(Result);
{_Column, Results} -> {_Column, Results} ->
@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
BinKey = to_bin(TypeOrKey), BinKey = to_bin(TypeOrKey),
case get_template(BinKey, State) of case get_template(BinKey, State) of
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
{_Statement, RowTemplate} -> {Statement, RowTemplate} ->
{BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
case DisablePreparedStatements of
true ->
{Statement, Rendered};
false ->
{BinKey, Rendered}
end
end. end.
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
maps:get(BinKey, Templates, undefined). maps:get(BinKey, Templates, undefined).
get_prepared_statement(Key, #{installed_channels := Channels} = _State) when get_templated_statement(Key, #{installed_channels := Channels} = _State) when
is_map_key(Key, Channels) is_map_key(Key, Channels)
-> ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels), ChannelState = maps:get(BinKey, Channels),
ChannelPreparedStatements = maps:get(prepares, ChannelState), ChannelPreparedStatements = maps:get(prepares, ChannelState),
maps:get(BinKey, ChannelPreparedStatements); maps:get(BinKey, ChannelPreparedStatements);
get_prepared_statement(Key, #{prepares := PrepStatements}) -> get_templated_statement(Key, #{prepares := PrepStatements}) ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
maps:get(BinKey, PrepStatements). maps:get(BinKey, PrepStatements).
@ -480,6 +517,8 @@ do_check_prepares(
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end; end;
do_check_prepares(#{prepares := disabled}) ->
ok;
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
ok; ok;
do_check_prepares(#{prepares := {error, _}} = State) -> do_check_prepares(#{prepares := {error, _}} = State) ->
@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc). conn_opts(Opts, Acc).
parse_prepare_sql(Config, SQLID) -> parse_sql_template(Config, SQLID) ->
Queries = Queries =
case Config of case Config of
#{prepare_statement := Qs} -> #{prepare_statement := Qs} ->
@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) ->
#{} -> #{} ->
#{} #{}
end, end,
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
#{query_templates => Templates}. #{query_templates => Templates}.
parse_prepare_sql(Key, Query, Acc) -> parse_sql_template(Key, Query, Acc) ->
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}), Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
Acc#{Key => Template}. Acc#{Key => Template}.
@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) ->
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}), {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
Row. Row.
init_prepare(State = #{prepares := disabled}) ->
State;
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
State; State;
init_prepare(State = #{}) -> init_prepare(State = #{}) ->

View File

@ -47,7 +47,10 @@ roots() ->
[]. [].
fields("connection_fields") -> fields("connection_fields") ->
[{server, server()}] ++ [
{server, server()},
emqx_postgresql:disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields("config_connector") -> fields("config_connector") ->

View File

@ -0,0 +1,3 @@
Added the `disable_prepared_statements` option for Postgres-based connectors.
This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode.

View File

@ -14,4 +14,13 @@ config_connector.desc:
config_connector.label: config_connector.label:
"""PostgreSQL Connector Config""" """PostgreSQL Connector Config"""
disable_prepared_statements.label:
"""Disable Prepared Statements"""
disable_prepared_statements.desc:
"""~
Disables the usage of prepared statements in the connections.
Some endpoints, like PGBouncer or Supabase in Transaction mode, do not
support session features such as prepared statements. For such connections,
this option should be enabled.~"""
} }

View File

@ -49,6 +49,7 @@ NIF
OCSP OCSP
OTP OTP
PEM PEM
PGBouncer
PINGREQ PINGREQ
PSK PSK
PSK PSK
@ -65,6 +66,7 @@ Riak
SHA SHA
SMS SMS
Struct Struct
Supabase
TCP TCP
TLS TLS
TTL TTL