diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index f4917f387..fb9341ddb 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -715,6 +715,59 @@ t_missing_table(Config) -> connect_and_create_table(Config), ok. +t_prepared_statement_exists(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + %% We should recover if the prepared statement name already exists in the + %% driver + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(1) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == connected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + %% We should get status disconnected if removing already existing statment don't help + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(30) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + snabbkaffe_nemesis:cleanup(), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + ok. + t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 5faf0aa47..2cf3392bf 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ad674a07c..e3ce3c479 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -240,7 +240,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% We ignore errors since any error probably means that the - %% prepared statement doesn't exist. + %% prepared statement doesn't exist. If it exists when we try + %% to insert one with the same name, we will try to remove it + %% again anyway. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> Statement = get_prepared_statement(ChannelId, State), @@ -648,16 +650,21 @@ do_prepare_sql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). + prepare_sql_to_conn(Conn, Prepares, #{}, 0). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, 3) when is_pid(Conn) -> + {error, {failed_to_remove_prev_prepared_statement, Key}}; +prepare_sql_to_conn( + Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts +) when is_pid(Conn) -> LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), + test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> - prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); + prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); {error, {error, error, _, undefined_table, _, _} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}), @@ -668,6 +675,29 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when ), ?SLOG(error, LogMsg), {error, undefined_table}; + {error, {error, error, _, duplicate_prepared_statement, _, _}} = Error -> + ?tp(pgsql_prepared_statement_exists, #{}), + LogMsg = + maps:merge( + LogMeta#{ + msg => "postgresql_prepared_statment_with_same_name_already_exists", + explain => << + "A prepared statement with the same name already " + "exists in the driver. Will attempt to remove the " + "previous prepared statement with the name and then " + "try again." + >> + }, + translate_to_log_context(Error) + ), + ?SLOG(warning, LogMsg), + case epgsql:close(Conn, statement, Key) of + ok -> + ?SLOG(info, #{msg => "pqsql_closed_statement_succefully"}); + {error, Error} -> + ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error}) + end, + prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); {error, Error} -> TranslatedError = translate_to_log_context(Error), LogMsg = @@ -679,6 +709,25 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when {error, export_error(TranslatedError)} end. +-ifdef(TEST). +test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL) -> + try + %% In test we inject a crash in the following trace point to test the + %% scenario when the prepared statement already exists. It is unknkown + %% in which scenario this can happen but it has been observed in a + %% production log file. See: + %% https://emqx.atlassian.net/browse/EEC-1036 + ?tp(pgsql_fake_prepare_statement_exists, #{}) + catch + _:_ -> + epgsql:parse2(Conn, Key, SQL, []) + end, + ok. +-else. +test_maybe_inject_prepared_statement_already_exists(_Conn, _Key, _SQL) -> + ok. +-endif. + to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) ->