From 7a16ff4f04e15112bb40ef8c7c560243a73e4d72 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 24 Jul 2023 17:47:17 -0300 Subject: [PATCH] fix(postgres_bridge): fix table existence check and handle sync_required Fixes https://emqx.atlassian.net/browse/EMQX-10629 During health checking, we check whether tables in the SQL statement exist. Such check was done by asking the backend to parse the statement using a named prepared statements. Concurrent health checks could then result in the error: ```erlang {error,{error,error,<<"42P05">>,duplicate_prepared_statement,<<"prepared statement \"get_status\" already exists">>,[{file,<<"prepare.c">>},{line,<<"451">>},{routine,<<"StorePreparedStatement">>},{severity,<<"ERROR">>}]}} ``` This could lead to an inconsistent state in the driver process, which would crash later when a message from the backend (`READY_FOR_QUERY`, "idle"): ``` 2023-07-24T13:05:58.892043+00:00 [error] Generic server <0.2134.0> terminating. Reason: {'module could not be loaded',[{undefined,handle_message,[90,<<"I">>,... ``` Added calls to `epgsql:sync/1` for functions that could return `{error, sync_required}`. Also, redundant calls to `parse2` were removed to reduce the number of requests. --- .../test/emqx_bridge_pgsql_SUITE.erl | 28 +++++++ .../src/emqx_connector_pgsql.erl | 84 +++++++++++++------ changes/ee/fix-11338.en.md | 1 + 3 files changed, 88 insertions(+), 25 deletions(-) create mode 100644 changes/ee/fix-11338.en.md diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index d16488bc6..262774a24 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -700,3 +700,31 @@ t_table_removed(Config) -> ), connect_and_create_table(Config), ok. + +t_concurrent_health_checks(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + emqx_utils:pmap( + fun(_) -> + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + end, + lists:seq(1, 20) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)), + ok + end + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index c468aa8bd..04ba4fd51 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -62,6 +62,11 @@ prepare_statement := epgsql:statement() }. +%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' +%% We want to be able to call sync if any message from the backend leaves the driver in an +%% inconsistent state needing sync. +-dialyzer({nowarn_function, [execute_batch/3]}). + %%===================================================================== roots() -> @@ -252,6 +257,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> reason => Reason }), case Reason of + sync_required -> + {error, {recoverable_error, Reason}}; ecpool_empty -> {error, {recoverable_error, Reason}}; {error, error, _, undefined_table, _, _} -> @@ -307,28 +314,13 @@ do_check_prepares( prepare_sql := #{<<"send_message">> := SQL} } = State ) -> - % it's already connected. Verify if target table still exists - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:foldl( - fun - (WorkerPid, ok) -> - case ecpool_worker:client(WorkerPid) of - {ok, Conn} -> - case epgsql:parse2(Conn, "get_status", SQL, []) of - {error, {_, _, _, undefined_table, _, _}} -> - {error, {undefined_table, State}}; - _ -> - ok - end; - _ -> - ok - end; - (_, Acc) -> - Acc - end, - ok, - Workers - ); + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + case validate_table_existence(WorkerPids, SQL) of + ok -> + ok; + {error, undefined_table} -> + {error, {undefined_table, State}} + end; do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> @@ -344,6 +336,30 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {error, Error} end. +-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}. +validate_table_existence([WorkerPid | Rest], SQL) -> + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case epgsql:parse2(Conn, "", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {error, undefined_table}; + Res when is_tuple(Res) andalso ok == element(1, Res) -> + ok; + Res -> + ?tp(postgres_connector_bad_parse2, #{result => Res}), + validate_table_existence(Rest, SQL) + end; + _ -> + validate_table_existence(Rest, SQL) + catch + exit:{noproc, _} -> + validate_table_existence(Rest, SQL) + end; +validate_table_existence([], _SQL) -> + %% All workers either replied an unexpected error; we will retry + %% on the next health check. + ok. + %% =================================================================== connect(Opts) -> @@ -358,13 +374,31 @@ connect(Opts) -> end. query(Conn, SQL, Params) -> - epgsql:equery(Conn, SQL, Params). + case epgsql:equery(Conn, SQL, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. prepared_query(Conn, Name, Params) -> - epgsql:prepared_query2(Conn, Name, Params). + case epgsql:prepared_query2(Conn, Name, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. execute_batch(Conn, Statement, Params) -> - epgsql:execute_batch(Conn, Statement, Params). + case epgsql:execute_batch(Conn, Statement, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. conn_opts(Opts) -> conn_opts(Opts, []). diff --git a/changes/ee/fix-11338.en.md b/changes/ee/fix-11338.en.md new file mode 100644 index 000000000..ed1924c13 --- /dev/null +++ b/changes/ee/fix-11338.en.md @@ -0,0 +1 @@ +Fixed an issue where the PostgreSQL bridge connection could crash under high message rates.