From f2ccfff803b4d8c5e71a56dfb39c4470c4d03a55 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 3 Jun 2024 16:14:26 +0200 Subject: [PATCH 1/7] fix(pgsql connector): handle prepared statement already exists In a user's log file it was found that that the pgsql driver can end up in a situation where the prepared statement for a channel/action is not properly removed before a channel with the same name as the prepared statement is added to the connector. This commit handles this by attempting to remove the old prepared statement if one already exists when adding channel. Related issue: https://emqx.atlassian.net/browse/EEC-1036 --- .../test/emqx_bridge_pgsql_SUITE.erl | 53 +++++++++++++++++ .../src/emqx_postgresql.app.src | 2 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 59 +++++++++++++++++-- 3 files changed, 108 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index f4917f387..fb9341ddb 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -715,6 +715,59 @@ t_missing_table(Config) -> connect_and_create_table(Config), ok. +t_prepared_statement_exists(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + %% We should recover if the prepared statement name already exists in the + %% driver + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(1) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == connected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + %% We should get status disconnected if removing already existing statment don't help + ?check_trace( + begin + ?inject_crash( + #{?snk_kind := pgsql_fake_prepare_statement_exists}, + snabbkaffe_nemesis:recover_after(30) + ), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := Status} when Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) + ) + ), + snabbkaffe_nemesis:cleanup(), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)), + ok + end + ), + ok. + t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 5faf0aa47..2cf3392bf 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ad674a07c..e3ce3c479 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -240,7 +240,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% We ignore errors since any error probably means that the - %% prepared statement doesn't exist. + %% prepared statement doesn't exist. If it exists when we try + %% to insert one with the same name, we will try to remove it + %% again anyway. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> Statement = get_prepared_statement(ChannelId, State), @@ -648,16 +650,21 @@ do_prepare_sql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). + prepare_sql_to_conn(Conn, Prepares, #{}, 0). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, 3) when is_pid(Conn) -> + {error, {failed_to_remove_prev_prepared_statement, Key}}; +prepare_sql_to_conn( + Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts +) when is_pid(Conn) -> LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), + test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> - prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}); + prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); {error, {error, error, _, undefined_table, _, _} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}), @@ -668,6 +675,29 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when ), ?SLOG(error, LogMsg), {error, undefined_table}; + {error, {error, error, _, duplicate_prepared_statement, _, _}} = Error -> + ?tp(pgsql_prepared_statement_exists, #{}), + LogMsg = + maps:merge( + LogMeta#{ + msg => "postgresql_prepared_statment_with_same_name_already_exists", + explain => << + "A prepared statement with the same name already " + "exists in the driver. Will attempt to remove the " + "previous prepared statement with the name and then " + "try again." + >> + }, + translate_to_log_context(Error) + ), + ?SLOG(warning, LogMsg), + case epgsql:close(Conn, statement, Key) of + ok -> + ?SLOG(info, #{msg => "pqsql_closed_statement_succefully"}); + {error, Error} -> + ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error}) + end, + prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); {error, Error} -> TranslatedError = translate_to_log_context(Error), LogMsg = @@ -679,6 +709,25 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when {error, export_error(TranslatedError)} end. +-ifdef(TEST). +test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL) -> + try + %% In test we inject a crash in the following trace point to test the + %% scenario when the prepared statement already exists. It is unknkown + %% in which scenario this can happen but it has been observed in a + %% production log file. See: + %% https://emqx.atlassian.net/browse/EEC-1036 + ?tp(pgsql_fake_prepare_statement_exists, #{}) + catch + _:_ -> + epgsql:parse2(Conn, Key, SQL, []) + end, + ok. +-else. +test_maybe_inject_prepared_statement_already_exists(_Conn, _Key, _SQL) -> + ok. +-endif. + to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) -> From 572ca6433eafcd27ac0de33389ce1cf0aa3384f9 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 4 Jun 2024 09:46:55 +0200 Subject: [PATCH 2/7] fix(pgsql connector): improvements due to suggestions from @thalesmg Co-authored-by: Thales Macedo Garitezi --- apps/emqx_postgresql/src/emqx_postgresql.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index e3ce3c479..9553b3ceb 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -654,7 +654,7 @@ prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, 3) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, _MaxAttempts = 3) when is_pid(Conn) -> {error, {failed_to_remove_prev_prepared_statement, Key}}; prepare_sql_to_conn( Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts @@ -665,7 +665,7 @@ prepare_sql_to_conn( case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); - {error, {error, error, _, undefined_table, _, _} = Error} -> + {error, #error{severity = error, codename = undefined_table} = Error} -> %% Target table is not created ?tp(pgsql_undefined_table, #{}), LogMsg = @@ -675,7 +675,7 @@ prepare_sql_to_conn( ), ?SLOG(error, LogMsg), {error, undefined_table}; - {error, {error, error, _, duplicate_prepared_statement, _, _}} = Error -> + {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error -> ?tp(pgsql_prepared_statement_exists, #{}), LogMsg = maps:merge( @@ -693,7 +693,7 @@ prepare_sql_to_conn( ?SLOG(warning, LogMsg), case epgsql:close(Conn, statement, Key) of ok -> - ?SLOG(info, #{msg => "pqsql_closed_statement_succefully"}); + ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}); {error, Error} -> ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error}) end, From 336089f8a7fd5d24c83deeaad3a49051d2204467 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 5 Jun 2024 15:53:02 +0200 Subject: [PATCH 3/7] fix: bug found by dialyzer and make test case cleaner --- .../test/emqx_bridge_pgsql_SUITE.erl | 44 +++++++++++++++---- apps/emqx_postgresql/src/emqx_postgresql.erl | 24 +--------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index fb9341ddb..c6eb99f83 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -715,17 +715,39 @@ t_missing_table(Config) -> connect_and_create_table(Config), ok. +%% We test that we can handle when the prepared statement with the channel +%% name already exists in the connection instance when we try to make a new +%% prepared statement. It is unknown in which scenario this can happen but it +%% has been observed in a production log file. +%% See: +%% https://emqx.atlassian.net/browse/EEC-1036 t_prepared_statement_exists(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), + emqx_common_test_helpers:on_exit(fun() -> + meck:unload() + end), + MeckOpts = [passthrough, no_link, no_history, non_strict], + meck:new(emqx_postgresql, MeckOpts), + InsertPrepStatementDupAndThenRemoveMeck = + fun(Conn, Key, SQL, List) -> + meck:passthrough([Conn, Key, SQL, List]), + meck:delete( + epgsql, + parse2, + 4 + ), + meck:passthrough([Conn, Key, SQL, List]) + end, + meck:expect( + epgsql, + parse2, + InsertPrepStatementDupAndThenRemoveMeck + ), %% We should recover if the prepared statement name already exists in the %% driver ?check_trace( begin - ?inject_crash( - #{?snk_kind := pgsql_fake_prepare_statement_exists}, - snabbkaffe_nemesis:recover_after(1) - ), ?assertMatch({ok, _}, create_bridge(Config)), ?retry( _Sleep = 1_000, @@ -742,13 +764,19 @@ t_prepared_statement_exists(Config) -> ok end ), + InsertPrepStatementDup = + fun(Conn, Key, SQL, List) -> + meck:passthrough([Conn, Key, SQL, List]), + meck:passthrough([Conn, Key, SQL, List]) + end, + meck:expect( + epgsql, + parse2, + InsertPrepStatementDup + ), %% We should get status disconnected if removing already existing statment don't help ?check_trace( begin - ?inject_crash( - #{?snk_kind := pgsql_fake_prepare_statement_exists}, - snabbkaffe_nemesis:recover_after(30) - ), ?assertMatch({ok, _}, create_bridge(Config)), ?retry( _Sleep = 1_000, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 9553b3ceb..ea83b951e 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -661,7 +661,6 @@ prepare_sql_to_conn( ) when is_pid(Conn) -> LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL}, ?SLOG(info, LogMeta), - test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL), case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0); @@ -694,8 +693,8 @@ prepare_sql_to_conn( case epgsql:close(Conn, statement, Key) of ok -> ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}); - {error, Error} -> - ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => Error}) + {error, CloseError} -> + ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => CloseError}) end, prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); {error, Error} -> @@ -709,25 +708,6 @@ prepare_sql_to_conn( {error, export_error(TranslatedError)} end. --ifdef(TEST). -test_maybe_inject_prepared_statement_already_exists(Conn, Key, SQL) -> - try - %% In test we inject a crash in the following trace point to test the - %% scenario when the prepared statement already exists. It is unknkown - %% in which scenario this can happen but it has been observed in a - %% production log file. See: - %% https://emqx.atlassian.net/browse/EEC-1036 - ?tp(pgsql_fake_prepare_statement_exists, #{}) - catch - _:_ -> - epgsql:parse2(Conn, Key, SQL, []) - end, - ok. --else. -test_maybe_inject_prepared_statement_already_exists(_Conn, _Key, _SQL) -> - ok. --endif. - to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) -> From 2956e849eb4eac34897ef0f96a63cd81131a0e62 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 5 Jun 2024 16:58:14 +0200 Subject: [PATCH 4/7] fix(pgsql connector): better msg when failing to remove statement --- apps/emqx_postgresql/src/emqx_postgresql.erl | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ea83b951e..e0add780c 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -654,8 +654,8 @@ prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, _MaxAttempts = 3) when is_pid(Conn) -> - {error, {failed_to_remove_prev_prepared_statement, Key}}; +prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) -> + failed_to_remove_prev_prepared_statement_error(); prepare_sql_to_conn( Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts ) when is_pid(Conn) -> @@ -692,11 +692,12 @@ prepare_sql_to_conn( ?SLOG(warning, LogMsg), case epgsql:close(Conn, statement, Key) of ok -> - ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}); + ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}), + prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); {error, CloseError} -> - ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => CloseError}) - end, - prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1); + ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}), + failed_to_remove_prev_prepared_statement_error() + end; {error, Error} -> TranslatedError = translate_to_log_context(Error), LogMsg = @@ -708,6 +709,13 @@ prepare_sql_to_conn( {error, export_error(TranslatedError)} end. +failed_to_remove_prev_prepared_statement_error() -> + Msg = + ("A previous prepared statement for the action already exists and " + "we are not able to close it. Please, try to disable and then enable " + "the connector to resolve this issue."), + {error, unicode:charactes_to_binary(Msg)}. + to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) -> From e63dcc84b071ced9ba477e7bf9bc3ca5418360a8 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 7 Jun 2024 12:20:42 +0200 Subject: [PATCH 5/7] fix: unused variable and better error message --- apps/emqx_postgresql/src/emqx_postgresql.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index e0add780c..b8caf36af 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -654,7 +654,7 @@ prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) -> failed_to_remove_prev_prepared_statement_error(); prepare_sql_to_conn( Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts @@ -711,8 +711,8 @@ prepare_sql_to_conn( failed_to_remove_prev_prepared_statement_error() -> Msg = - ("A previous prepared statement for the action already exists and " - "we are not able to close it. Please, try to disable and then enable " + ("A previous prepared statement for the action already exists " + "but cannot be closed. Please, try to disable and then enable " "the connector to resolve this issue."), {error, unicode:charactes_to_binary(Msg)}. From c13631102eb6c63a75cdf5cf50fb8379f954d9d3 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 7 Jun 2024 19:40:18 +0200 Subject: [PATCH 6/7] fix(PgSQL connector): typo in function name --- apps/emqx_postgresql/src/emqx_postgresql.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index b8caf36af..6f5200fb0 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -714,7 +714,7 @@ failed_to_remove_prev_prepared_statement_error() -> ("A previous prepared statement for the action already exists " "but cannot be closed. Please, try to disable and then enable " "the connector to resolve this issue."), - {error, unicode:charactes_to_binary(Msg)}. + {error, unicode:characters_to_binary(Msg)}. to_bin(Bin) when is_binary(Bin) -> Bin; From a885f0b41af862bf3bebd4ec8828c60934271612 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 10 Jun 2024 13:45:27 +0200 Subject: [PATCH 7/7] test(emqx_bridge_pgsql_SUITE): call test janitor --- apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index c6eb99f83..e2f5ac868 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -135,6 +135,7 @@ end_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), ok = snabbkaffe:stop(), delete_bridge(Config), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------