fix(pgsql connector): handle prepared statement already exists

In a user's log file it was found that that the pgsql driver can end up
in a situation where the prepared statement for a channel/action is not
properly removed before a channel with the same name as the prepared
statement is added to the connector. This commit handles this by
attempting to remove the old prepared statement if one already exists
when adding channel.

Related issue:
https://emqx.atlassian.net/browse/EEC-1036
This commit is contained in:
Kjell Winblad 2024-06-03 16:14:26 +02:00
parent b14a138bf1
commit f2ccfff803
3 changed files with 108 additions and 6 deletions

View File

@ -715,6 +715,59 @@ t_missing_table(Config) ->
connect_and_create_table(Config), connect_and_create_table(Config),
ok. ok.
t_prepared_statement_exists(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
%% We should recover if the prepared statement name already exists in the
%% driver
?check_trace(
begin
?inject_crash(
#{?snk_kind := pgsql_fake_prepare_statement_exists},
snabbkaffe_nemesis:recover_after(1)
),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
#{status := Status} when Status == connected,
emqx_bridge_v2:health_check(BridgeType, Name)
)
),
ok
end,
fun(Trace) ->
?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
ok
end
),
%% We should get status disconnected if removing already existing statment don't help
?check_trace(
begin
?inject_crash(
#{?snk_kind := pgsql_fake_prepare_statement_exists},
snabbkaffe_nemesis:recover_after(30)
),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
#{status := Status} when Status == disconnected,
emqx_bridge_v2:health_check(BridgeType, Name)
)
),
snabbkaffe_nemesis:cleanup(),
ok
end,
fun(Trace) ->
?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
ok
end
),
ok.
t_table_removed(Config) -> t_table_removed(Config) ->
Name = ?config(pgsql_name, Config), Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config), BridgeType = ?config(pgsql_bridge_type, Config),

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -240,7 +240,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% We ignore errors since any error probably means that the %% We ignore errors since any error probably means that the
%% prepared statement doesn't exist. %% prepared statement doesn't exist. If it exists when we try
%% to insert one with the same name, we will try to remove it
%% again anyway.
try ecpool_worker:client(WorkerPid) of try ecpool_worker:client(WorkerPid) of
{ok, Conn} -> {ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State), Statement = get_prepared_statement(ChannelId, State),
@ -648,16 +650,21 @@ do_prepare_sql([], _Prepares, LastSts) ->
{ok, LastSts}. {ok, LastSts}.
prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, Prepares) ->
prepare_sql_to_conn(Conn, Prepares, #{}). prepare_sql_to_conn(Conn, Prepares, #{}, 0).
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
{ok, Statements}; {ok, Statements};
prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, 3) when is_pid(Conn) ->
{error, {failed_to_remove_prev_prepared_statement, Key}};
prepare_sql_to_conn(
Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
) when is_pid(Conn) ->
LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
?SLOG(info, LogMeta), ?SLOG(info, LogMeta),
test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL),
case epgsql:parse2(Conn, Key, SQL, []) of case epgsql:parse2(Conn, Key, SQL, []) of
{ok, Statement} -> {ok, Statement} ->
prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
{error, {error, error, _, undefined_table, _, _} = Error} -> {error, {error, error, _, undefined_table, _, _} = Error} ->
%% Target table is not created %% Target table is not created
?tp(pgsql_undefined_table, #{}), ?tp(pgsql_undefined_table, #{}),
@ -668,6 +675,29 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when
), ),
?SLOG(error, LogMsg), ?SLOG(error, LogMsg),
{error, undefined_table}; {error, undefined_table};
{error, {error, error, _, duplicate_prepared_statement, _, _}} = Error ->
?tp(pgsql_prepared_statement_exists, #{}),
LogMsg =
maps:merge(
LogMeta#{
msg => "postgresql_prepared_statment_with_same_name_already_exists",
explain => <<
"A prepared statement with the same name already "
"exists in the driver. Will attempt to remove the "
"previous prepared statement with the name and then "
"try again."
>>
},
translate_to_log_context(Error)
),
?SLOG(warning, LogMsg),
case epgsql:close(Conn, statement, Key) of
ok ->
?SLOG(info, #{msg => "pqsql_closed_statement_succefully"});
{error, Error} ->
?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error})
end,
prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
{error, Error} -> {error, Error} ->
TranslatedError = translate_to_log_context(Error), TranslatedError = translate_to_log_context(Error),
LogMsg = LogMsg =
@ -679,6 +709,25 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when
{error, export_error(TranslatedError)} {error, export_error(TranslatedError)}
end. end.
-ifdef(TEST).
test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL) ->
try
%% In test we inject a crash in the following trace point to test the
%% scenario when the prepared statement already exists. It is unknkown
%% in which scenario this can happen but it has been observed in a
%% production log file. See:
%% https://emqx.atlassian.net/browse/EEC-1036
?tp(pgsql_fake_prepare_statement_exists, #{})
catch
_:_ ->
epgsql:parse2(Conn, Key, SQL, [])
end,
ok.
-else.
test_maybe_inject_prepared_statement_already_exists(_Conn, _Key, _SQL) ->
ok.
-endif.
to_bin(Bin) when is_binary(Bin) -> to_bin(Bin) when is_binary(Bin) ->
Bin; Bin;
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->