From 9454af9a8b8098a928172fe0cd6e3942fcc4c0ae Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Thu, 4 May 2023 20:36:27 -0300 Subject: [PATCH] feat(postgresql): check whether target table exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- .../test/emqx_bridge_pgsql_SUITE.erl | 67 +++++++++++++++++++ .../src/emqx_connector_pgsql.erl | 45 ++++++++++++- apps/emqx_resource/src/emqx_resource.erl | 14 ++-- .../src/emqx_resource_buffer_worker.erl | 2 + 4 files changed, 120 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 4e7da85bd..6806328d6 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -257,6 +257,12 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). +query_resource_sync(Config, Request) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + query_resource_async(Config, Request) -> query_resource_async(Config, Request, _Opts = #{}). @@ -634,3 +640,64 @@ t_nasty_sql_string(Config) -> 1_000 ), ?assertEqual(Payload, connect_and_get_payload(Config)). + +t_missing_table(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?check_trace( + begin + connect_and_drop_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status == connecting orelse Status == disconnected, + emqx_resource_manager:health_check(ResourceID) + ) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, {resource_error, #{reason := unhealthy_target}}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)), + ok + end + ), + connect_and_create_table(Config), + ok. + +t_table_removed(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + ?assertMatch( + {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}}, + query_resource_sync(Config, {send_message, SentData, []}) + ), + ok + end, + [] + ), + connect_and_create_table(Config), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 8fb60f102..e0d3e834f 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -238,6 +238,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> case Reason of ecpool_empty -> {error, {recoverable_error, Reason}}; + {error, error, _, undefined_table, _, _} -> + {error, {unrecoverable_error, Reason}}; _ -> Result end; @@ -271,7 +273,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {connected, NState}; false -> %% do not log error, it is logged in prepare_sql_to_conn - connecting + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> connecting @@ -280,6 +285,30 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). +do_check_prepares( + #{ + pool_name := PoolName, + prepare_sql := #{<<"send_message">> := SQL} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + {ok, Conn} = ecpool_worker:client(WorkerPid), + case epgsql:parse2(Conn, "get_status", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {undefined_table, State}; + _ -> + ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> @@ -288,6 +317,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {ok, Sts} -> %% remove the error {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_sql => {error, Prepares}}}; _Error -> false end. @@ -373,7 +405,7 @@ init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> msg => <<"PostgreSQL init prepare statement failed">>, error => Error }, ?SLOG(error, LogMeta), - %% mark the prepare_sqlas failed + %% mark the prepare_sql as failed State#{prepare_sql => {error, Prepares}} end end. @@ -414,6 +446,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + {error, {error, error, _, undefined_table, _, _} = Error} -> + %% Target table is not created + ?tp(pgsql_undefined_table, #{}), + ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), + {error, undefined_table}; {error, Error} = Other -> ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), Other @@ -424,6 +461,10 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, {recoverable_error, _Error}} = Res) -> + Res; +handle_result({error, {unrecoverable_error, _Error}} = Res) -> + Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5a8eab324..7b0f387f5 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -278,20 +278,22 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM}} -> - case QM of - simple_async -> + {ok, _Group, #{query_mode := QM, error := Error}} -> + case {QM, Error} of + {_, unhealthy_target} -> + ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); + {simple_async, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again Opts1 = Opts#{is_buffer_supported => true}, emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); - simple_sync -> + {simple_sync, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request); - sync -> + {sync, _} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - async -> + {async, _} -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 7a0bcaea9..0bbdb3a0a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -985,6 +985,8 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); + {ok, _Group, #{status := connecting, error := unhealthy_target}} -> + {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} ->