diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index f4917f387..e2f5ac868 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -135,6 +135,7 @@ end_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), ok = snabbkaffe:stop(), delete_bridge(Config), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -715,6 +716,87 @@ t_missing_table(Config) -> connect_and_create_table(Config), ok. +%% We test that we can handle when the prepared statement with the channel +%% name already exists in the connection instance when we try to make a new +%% prepared statement. It is unknown in which scenario this can happen but it +%% has been observed in a production log file. +%% See: +%% https://emqx.atlassian.net/browse/EEC-1036 +t_prepared_statement_exists(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + emqx_common_test_helpers:on_exit(fun() -> + meck:unload() + end), + MeckOpts = [passthrough, no_link, no_history, non_strict], + meck:new(emqx_postgresql, MeckOpts), + InsertPrepStatementDupAndThenRemoveMeck = + fun(Conn, Key, SQL, List) -> + meck:passthrough([Conn, Key, SQL, List]), + meck:delete( + epgsql, + parse2, + 4 + ), + meck:passthrough([Conn, Key, SQL, List]) + end, + meck:expect( + epgsql, + parse2, + InsertPrepStatementDupAndThenRemoveMeck + ), + %% We should recover if the prepared statement name already exists in the + %% driver + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == connected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + InsertPrepStatementDup = + fun(Conn, Key, SQL, List) -> + meck:passthrough([Conn, Key, SQL, List]), + meck:passthrough([Conn, Key, SQL, List]) + end, + meck:expect( + epgsql, + parse2, + InsertPrepStatementDup + ), + %% We should get status disconnected if removing already existing statment don't help + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + snabbkaffe_nemesis:cleanup(), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + ok. + t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 7fe564dc3..54bce1006 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -268,7 +268,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% We ignore errors since any error probably means that the - %% prepared statement doesn't exist. + %% prepared statement doesn't exist. If it exists when we try + %% to insert one with the same name, we will try to remove it + %% again anyway. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> Statement = get_templated_statement(ChannelId, State), @@ -689,17 +691,21 @@ do_prepare_sql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). + prepare_sql_to_conn(Conn, Prepares, #{}, 0). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) -> + failed_to_remove_prev_prepared_statement_error(); +prepare_sql_to_conn( + Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts +) when is_pid(Conn) -> LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> - prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); - {error, {error, error, _, undefined_table, _, _} = Error} -> + prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); + {error, #error{severity = error, codename = undefined_table} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}), LogMsg = @@ -709,6 +715,30 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when ), ?SLOG(error, LogMsg), {error, undefined_table}; + {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error -> + ?tp(pgsql_prepared_statement_exists, #{}), + LogMsg = + maps:merge( + LogMeta#{ + msg => "postgresql_prepared_statment_with_same_name_already_exists", + explain => << + "A prepared statement with the same name already " + "exists in the driver. Will attempt to remove the " + "previous prepared statement with the name and then " + "try again." + >> + }, + translate_to_log_context(Error) + ), + ?SLOG(warning, LogMsg), + case epgsql:close(Conn, statement, Key) of + ok -> + ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}), + prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); + {error, CloseError} -> + ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}), + failed_to_remove_prev_prepared_statement_error() + end; {error, Error} -> TranslatedError = translate_to_log_context(Error), LogMsg = @@ -720,6 +750,13 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when {error, export_error(TranslatedError)} end. +failed_to_remove_prev_prepared_statement_error() -> + Msg = + ("A previous prepared statement for the action already exists " + "but cannot be closed. Please, try to disable and then enable " + "the connector to resolve this issue."), + {error, unicode:characters_to_binary(Msg)}. + to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) ->