From f2ccfff803b4d8c5e71a56dfb39c4470c4d03a55 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 3 Jun 2024 16:14:26 +0200 Subject: [PATCH] fix(pgsql connector): handle prepared statement already exists In a user's log file it was found that that the pgsql driver can end up in a situation where the prepared statement for a channel/action is not properly removed before a channel with the same name as the prepared statement is added to the connector. This commit handles this by attempting to remove the old prepared statement if one already exists when adding channel. Related issue: https://emqx.atlassian.net/browse/EEC-1036 --- .../test/emqx_bridge_pgsql_SUITE.erl | 53 +++++++++++++++++ .../src/emqx_postgresql.app.src | 2 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 59 +++++++++++++++++-- 3 files changed, 108 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index f4917f387..fb9341ddb 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -715,6 +715,59 @@ t_missing_table(Config) -> connect_and_create_table(Config), ok. +t_prepared_statement_exists(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + %% We should recover if the prepared statement name already exists in the + %% driver + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(1) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == connected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + %% We should get status disconnected if removing already existing statment don't help + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(30) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + snabbkaffe_nemesis:cleanup(), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + ok. + t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 5faf0aa47..2cf3392bf 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ad674a07c..e3ce3c479 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -240,7 +240,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% We ignore errors since any error probably means that the - %% prepared statement doesn't exist. + %% prepared statement doesn't exist. If it exists when we try + %% to insert one with the same name, we will try to remove it + %% again anyway. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> Statement = get_prepared_statement(ChannelId, State), @@ -648,16 +650,21 @@ do_prepare_sql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). + prepare_sql_to_conn(Conn, Prepares, #{}, 0). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, 3) when is_pid(Conn) -> + {error, {failed_to_remove_prev_prepared_statement, Key}}; +prepare_sql_to_conn( + Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts +) when is_pid(Conn) -> LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), + test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> - prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); + prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); {error, {error, error, _, undefined_table, _, _} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}), @@ -668,6 +675,29 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when ), ?SLOG(error, LogMsg), {error, undefined_table}; + {error, {error, error, _, duplicate_prepared_statement, _, _}} = Error -> + ?tp(pgsql_prepared_statement_exists, #{}), + LogMsg = + maps:merge( + LogMeta#{ + msg => "postgresql_prepared_statment_with_same_name_already_exists", + explain => << + "A prepared statement with the same name already " + "exists in the driver. Will attempt to remove the " + "previous prepared statement with the name and then " + "try again." + >> + }, + translate_to_log_context(Error) + ), + ?SLOG(warning, LogMsg), + case epgsql:close(Conn, statement, Key) of + ok -> + ?SLOG(info, #{msg => "pqsql_closed_statement_succefully"}); + {error, Error} -> + ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error}) + end, + prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); {error, Error} -> TranslatedError = translate_to_log_context(Error), LogMsg = @@ -679,6 +709,25 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when {error, export_error(TranslatedError)} end. +-ifdef(TEST). +test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL) -> + try + %% In test we inject a crash in the following trace point to test the + %% scenario when the prepared statement already exists. It is unknkown + %% in which scenario this can happen but it has been observed in a + %% production log file. See: + %% https://emqx.atlassian.net/browse/EEC-1036 + ?tp(pgsql_fake_prepare_statement_exists, #{}) + catch + _:_ -> + epgsql:parse2(Conn, Key, SQL, []) + end, + ok. +-else. +test_maybe_inject_prepared_statement_already_exists(_Conn, _Key, _SQL) -> + ok. +-endif. + to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) ->