diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index d16488bc6..262774a24 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -700,3 +700,31 @@ t_table_removed(Config) -> ), connect_and_create_table(Config), ok. + +t_concurrent_health_checks(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + emqx_utils:pmap( + fun(_) -> + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + end, + lists:seq(1, 20) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)), + ok + end + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index c468aa8bd..04ba4fd51 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -62,6 +62,11 @@ prepare_statement := epgsql:statement() }. +%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' +%% We want to be able to call sync if any message from the backend leaves the driver in an +%% inconsistent state needing sync. +-dialyzer({nowarn_function, [execute_batch/3]}). + %%===================================================================== roots() -> @@ -252,6 +257,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> reason => Reason }), case Reason of + sync_required -> + {error, {recoverable_error, Reason}}; ecpool_empty -> {error, {recoverable_error, Reason}}; {error, error, _, undefined_table, _, _} -> @@ -307,28 +314,13 @@ do_check_prepares( prepare_sql := #{<<"send_message">> := SQL} } = State ) -> - % it's already connected. Verify if target table still exists - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:foldl( - fun - (WorkerPid, ok) -> - case ecpool_worker:client(WorkerPid) of - {ok, Conn} -> - case epgsql:parse2(Conn, "get_status", SQL, []) of - {error, {_, _, _, undefined_table, _, _}} -> - {error, {undefined_table, State}}; - _ -> - ok - end; - _ -> - ok - end; - (_, Acc) -> - Acc - end, - ok, - Workers - ); + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + case validate_table_existence(WorkerPids, SQL) of + ok -> + ok; + {error, undefined_table} -> + {error, {undefined_table, State}} + end; do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> @@ -344,6 +336,30 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {error, Error} end. +-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}. +validate_table_existence([WorkerPid | Rest], SQL) -> + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case epgsql:parse2(Conn, "", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {error, undefined_table}; + Res when is_tuple(Res) andalso ok == element(1, Res) -> + ok; + Res -> + ?tp(postgres_connector_bad_parse2, #{result => Res}), + validate_table_existence(Rest, SQL) + end; + _ -> + validate_table_existence(Rest, SQL) + catch + exit:{noproc, _} -> + validate_table_existence(Rest, SQL) + end; +validate_table_existence([], _SQL) -> + %% All workers either replied an unexpected error; we will retry + %% on the next health check. + ok. + %% =================================================================== connect(Opts) -> @@ -358,13 +374,31 @@ connect(Opts) -> end. query(Conn, SQL, Params) -> - epgsql:equery(Conn, SQL, Params). + case epgsql:equery(Conn, SQL, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. prepared_query(Conn, Name, Params) -> - epgsql:prepared_query2(Conn, Name, Params). + case epgsql:prepared_query2(Conn, Name, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. execute_batch(Conn, Statement, Params) -> - epgsql:execute_batch(Conn, Statement, Params). + case epgsql:execute_batch(Conn, Statement, Params) of + {error, sync_required} = Res -> + ok = epgsql:sync(Conn), + Res; + Res -> + Res + end. conn_opts(Opts) -> conn_opts(Opts, []). diff --git a/changes/ee/fix-11338.en.md b/changes/ee/fix-11338.en.md new file mode 100644 index 000000000..ed1924c13 --- /dev/null +++ b/changes/ee/fix-11338.en.md @@ -0,0 +1 @@ +Fixed an issue where the PostgreSQL bridge connection could crash under high message rates.