From 9454af9a8b8098a928172fe0cd6e3942fcc4c0ae Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Thu, 4 May 2023 20:36:27 -0300 Subject: [PATCH 1/4] feat(postgresql): check whether target table exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- .../test/emqx_bridge_pgsql_SUITE.erl | 67 +++++++++++++++++++ .../src/emqx_connector_pgsql.erl | 45 ++++++++++++- apps/emqx_resource/src/emqx_resource.erl | 14 ++-- .../src/emqx_resource_buffer_worker.erl | 2 + 4 files changed, 120 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 4e7da85bd..6806328d6 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -257,6 +257,12 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). +query_resource_sync(Config, Request) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + query_resource_async(Config, Request) -> query_resource_async(Config, Request, _Opts = #{}). @@ -634,3 +640,64 @@ t_nasty_sql_string(Config) -> 1_000 ), ?assertEqual(Payload, connect_and_get_payload(Config)). + +t_missing_table(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?check_trace( + begin + connect_and_drop_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status == connecting orelse Status == disconnected, + emqx_resource_manager:health_check(ResourceID) + ) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, {resource_error, #{reason := unhealthy_target}}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)), + ok + end + ), + connect_and_create_table(Config), + ok. + +t_table_removed(Config) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + ?assertMatch( + {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}}, + query_resource_sync(Config, {send_message, SentData, []}) + ), + ok + end, + [] + ), + connect_and_create_table(Config), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 8fb60f102..e0d3e834f 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -238,6 +238,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> case Reason of ecpool_empty -> {error, {recoverable_error, Reason}}; + {error, error, _, undefined_table, _, _} -> + {error, {unrecoverable_error, Reason}}; _ -> Result end; @@ -271,7 +273,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {connected, NState}; false -> %% do not log error, it is logged in prepare_sql_to_conn - connecting + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> connecting @@ -280,6 +285,30 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). +do_check_prepares( + #{ + pool_name := PoolName, + prepare_sql := #{<<"send_message">> := SQL} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + {ok, Conn} = ecpool_worker:client(WorkerPid), + case epgsql:parse2(Conn, "get_status", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {undefined_table, State}; + _ -> + ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> @@ -288,6 +317,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {ok, Sts} -> %% remove the error {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_sql => {error, Prepares}}}; _Error -> false end. @@ -373,7 +405,7 @@ init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> msg => <<"PostgreSQL init prepare statement failed">>, error => Error }, ?SLOG(error, LogMeta), - %% mark the prepare_sqlas failed + %% mark the prepare_sql as failed State#{prepare_sql => {error, Prepares}} end end. @@ -414,6 +446,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co case epgsql:parse2(Conn, Key, SQL, []) of {ok, Statement} -> prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + {error, {error, error, _, undefined_table, _, _} = Error} -> + %% Target table is not created + ?tp(pgsql_undefined_table, #{}), + ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), + {error, undefined_table}; {error, Error} = Other -> ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), Other @@ -424,6 +461,10 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, {recoverable_error, _Error}} = Res) -> + Res; +handle_result({error, {unrecoverable_error, _Error}} = Res) -> + Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5a8eab324..7b0f387f5 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -278,20 +278,22 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM}} -> - case QM of - simple_async -> + {ok, _Group, #{query_mode := QM, error := Error}} -> + case {QM, Error} of + {_, unhealthy_target} -> + ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); + {simple_async, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again Opts1 = Opts#{is_buffer_supported => true}, emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); - simple_sync -> + {simple_sync, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request); - sync -> + {sync, _} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - async -> + {async, _} -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 7a0bcaea9..0bbdb3a0a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -985,6 +985,8 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); + {ok, _Group, #{status := connecting, error := unhealthy_target}} -> + {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> From c9a2ddf98c6570167ed9661edec8f90616e8028f Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Fri, 5 May 2023 15:42:31 -0300 Subject: [PATCH 2/4] feat(mysql): check whether target table exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- .../src/emqx_connector_mysql.erl | 45 +++++++++++- .../test/emqx_ee_bridge_mysql_SUITE.erl | 71 +++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 9c40919f2..4a3ad1f3b 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -224,7 +224,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {connected, NState}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> connecting @@ -233,7 +236,37 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> do_get_status(Conn) -> ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). -do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> +do_check_prepares( + #{ + pool_name := PoolName, + prepare_statement := #{send_message := SQL} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case mysql:prepare(Conn, get_status, SQL) of + {error, {1146, _, _}} -> + {undefined_table, State}; + {ok, Statement} -> + mysql:unprepare(Conn, Statement); + _ -> + ok + end; + _ -> + ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); +do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) -> %% retry to prepare @@ -241,6 +274,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, ok -> %% remove the error {ok, State#{prepare_statement => Prepares}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_statement => {error, Prepares}}}; {error, Reason} -> {error, Reason} end. @@ -320,6 +356,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> {ok, _Key} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); + {error, {1146, _, _} = Reason} -> + %% Target table is not created + ?tp(mysql_undefined_table, #{}), + ?SLOG(error, LogMeta#{result => failed, reason => Reason}), + {error, undefined_table}; {error, Reason} -> % FIXME: we should try to differ on transient failers and % syntax failures. Retrying syntax failures is not very productive. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 4af180a2f..d8489b1b9 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -110,6 +110,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + connect_and_create_table(Config), connect_and_clear_table(Config), delete_bridge(Config), snabbkaffe:start_trace(), @@ -241,6 +242,12 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 500}). +sync_query_resource(Config, Request) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + query_resource_async(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), @@ -480,6 +487,7 @@ t_write_timeout(Config) -> ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), {ok, _} = create_bridge(Config), + connect_and_create_table(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, Timeout = 1000, @@ -641,6 +649,7 @@ t_workload_fits_prepared_statement_limit(Config) -> ). t_unprepared_statement_query(Config) -> + ok = connect_and_create_table(Config), ?assertMatch( {ok, _}, create_bridge(Config) @@ -668,6 +677,7 @@ t_unprepared_statement_query(Config) -> %% Test doesn't work with batch enabled since batch doesn't use %% prepared statements as such; it has its own query generation process t_uninitialized_prepared_statement(Config) -> + connect_and_create_table(Config), ?assertMatch( {ok, _}, create_bridge(Config) @@ -705,3 +715,64 @@ t_uninitialized_prepared_statement(Config) -> end ), ok. + +t_missing_table(Config) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?check_trace( + begin + connect_and_drop_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status == connecting orelse Status == disconnected, + emqx_resource_manager:health_check(ResourceID) + ) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, {resource_error, #{reason := unhealthy_target}}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)), + ok + end + ). + +t_table_removed(Config) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, + {unrecoverable_error, + {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, + sync_query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + [] + ), + ok. From 5f1093609181b74a7b2223c24bfd10667498bb5f Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Tue, 9 May 2023 19:49:12 -0300 Subject: [PATCH 3/4] feat(oracle): check whether target table exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- .../test/emqx_bridge_oracle_SUITE.erl | 80 +++++++-- apps/emqx_oracle/src/emqx_oracle.erl | 155 +++++++++++++----- 2 files changed, 185 insertions(+), 50 deletions(-) diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 06b0256e2..d7c7cec74 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -179,18 +179,39 @@ sql_drop_table() -> sql_check_table_exist() -> "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". +new_jamdb_connection(Config) -> + JamdbOpts = [ + {host, ?config(oracle_host, Config)}, + {port, ?config(oracle_port, Config)}, + {user, "system"}, + {password, "oracle"}, + {sid, ?SID} + ], + jamdb_oracle:start(JamdbOpts). + +close_jamdb_connection(Conn) -> + jamdb_oracle:stop(Conn). + reset_table(Config) -> - ResourceId = resource_id(Config), - drop_table_if_exists(Config), - {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( - ResourceId, {sql, sql_create_table()} - ), + {ok, Conn} = new_jamdb_connection(Config), + try + ok = drop_table_if_exists(Conn), + {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table()) + after + close_jamdb_connection(Conn) + end, ok. +drop_table_if_exists(Conn) when is_pid(Conn) -> + {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()), + ok; drop_table_if_exists(Config) -> - ResourceId = resource_id(Config), - {ok, [{proc_result, 0, _}]} = - emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + {ok, Conn} = new_jamdb_connection(Config), + try + ok = drop_table_if_exists(Conn) + after + close_jamdb_connection(Conn) + end, ok. oracle_config(TestCase, _ConnectionType, Config) -> @@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) -> " pool_size = 1\n" " sql = \"~s\"\n" " resource_opts = {\n" - " health_check_interval = \"5s\"\n" + " health_check_interval = \"15s\"\n" " request_ttl = \"30s\"\n" " query_mode = \"async\"\n" " batch_size = 3\n" @@ -349,13 +370,13 @@ t_sync_query(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - reset_table(Config), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -381,13 +402,13 @@ t_batch_sync_query(Config) -> BridgeId = bridge_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, _Attempts = 30, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - reset_table(Config), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -464,6 +485,7 @@ t_start_stop(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -515,6 +537,7 @@ t_on_get_status(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), ResourceId = resource_id(Config), + reset_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) -> ), ok. +t_missing_table(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + drop_table_if_exists(Config), + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status =:= disconnected orelse Status =:= connecting, + emqx_resource_manager:health_check(ResourceId) + ) + ), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertMatch( + {error, {resource_error, #{reason := not_connected}}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + fun(Trace) -> + ?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)), + ok + end + ). + t_table_removed(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 61952b96b..2d190057c 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -9,8 +9,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(ORACLE_DEFAULT_PORT, 1521). - %%==================================================================== %% Exports %%==================================================================== @@ -26,7 +24,7 @@ ]). %% callbacks for ecpool --export([connect/1, prepare_sql_to_conn/2]). +-export([connect/1, prepare_sql_to_conn/3]). %% Internal exports used to execute code with ecpool worker -export([ @@ -39,18 +37,15 @@ oracle_host_options/0 ]). --define(ACTION_SEND_MESSAGE, send_message). - +-define(ORACLE_DEFAULT_PORT, 1521). -define(SYNC_QUERY_MODE, no_handover). - +-define(DEFAULT_POOL_SIZE, 8). +-define(OPT_TIMEOUT, 30000). +-define(MAX_CURSORS, 10). -define(ORACLE_HOST_OPTIONS, #{ default_port => ?ORACLE_DEFAULT_PORT }). --define(MAX_CURSORS, 10). --define(DEFAULT_POOL_SIZE, 8). --define(OPT_TIMEOUT, 30000). - -type prepares() :: #{atom() => binary()}. -type params_tokens() :: #{atom() => list()}. @@ -105,7 +100,7 @@ on_start( ], PoolName = InstId, Prepares = parse_prepare_sql(Config), - InitState = #{pool_name => PoolName, prepare_statement => #{}}, + InitState = #{pool_name => PoolName}, State = maps:merge(InitState, Prepares), case emqx_resource_pool:start(InstId, ?MODULE, Options) of ok -> @@ -148,7 +143,7 @@ on_query( on_batch_query( InstId, BatchReq, - #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State + #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State ) -> case BatchReq of [{Key, _} = Request | _] -> @@ -241,7 +236,13 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> connected; {ok, NState} -> %% return new state with prepared statements - {connected, NState} + {connected, NState}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> disconnected @@ -250,11 +251,42 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> do_get_status(Conn) -> ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). -do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> - ok; -do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> - {ok, Sts} = prepare_sql(Prepares, PoolName), - {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}. +do_check_prepares( + #{ + pool_name := PoolName, + prepare_sql := #{<<"send_message">> := SQL}, + params_tokens := #{<<"send_message">> := Tokens} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + {ok, Conn} = ecpool_worker:client(WorkerPid), + case check_if_table_exists(Conn, SQL, Tokens) of + {error, undefined_table} -> {undefined_table, State}; + _ -> ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); +do_check_prepares( + State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap} +) -> + case prepare_sql(Prepares, PoolName, TokensMap) of + %% remove the error + {ok, Sts} -> + {ok, State#{prepare_sql => Sts}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_sql => {error, Prepares}}}; + {error, _Reason} = Error -> + Error + end. %% =================================================================== @@ -312,36 +344,81 @@ parse_prepare_sql([], Prepares, Tokens) -> params_tokens => Tokens }. -init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> - {ok, Sts} = prepare_sql(Prepares, PoolName), - State#{prepare_statement := Sts}. +init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) -> + case prepare_sql(Prepares, PoolName, TokensMap) of + {ok, Sts} -> + State#{prepare_sql := Sts}; + Error -> + LogMeta = #{ + msg => <<"Oracle Database init prepare statement failed">>, error => Error + }, + ?SLOG(error, LogMeta), + %% mark the prepare_sql as failed + State#{prepare_sql => {error, Prepares}} + end. -prepare_sql(Prepares, PoolName) when is_map(Prepares) -> - prepare_sql(maps:to_list(Prepares), PoolName); -prepare_sql(Prepares, PoolName) -> - Data = do_prepare_sql(Prepares, PoolName), - {ok, _Sts} = Data, - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), - Data. +prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName, TokensMap); +prepare_sql(Prepares, PoolName, TokensMap) -> + case do_prepare_sql(Prepares, PoolName, TokensMap) of + {ok, _Sts} = Ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Ok; + Error -> + Error + end. -do_prepare_sql(Prepares, PoolName) -> - do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). +do_prepare_sql(Prepares, PoolName, TokensMap) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}). -do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), - {ok, Sts} = prepare_sql_to_conn(Conn, Prepares), - do_prepare_sql(T, Prepares, PoolName, Sts); -do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + case prepare_sql_to_conn(Conn, Prepares, TokensMap) of + {ok, Sts} -> + do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts); + Error -> + Error + end; +do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) -> {ok, LastSts}. -prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). +prepare_sql_to_conn(Conn, Prepares, TokensMap) -> + prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) -> LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL}, + Tokens = maps:get(Key, TokensMap, []), ?SLOG(info, LogMeta), - prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}). + case check_if_table_exists(Conn, SQL, Tokens) of + ok -> + ?SLOG(info, LogMeta#{result => success}), + prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL}); + {error, undefined_table} = Error -> + %% Target table is not created + ?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}), + ?tp(oracle_undefined_table, #{}), + Error; + Error -> + Error + end. + +check_if_table_exists(Conn, SQL, Tokens) -> + {Event, _Headers} = emqx_rule_events:eventmsg_publish( + emqx_message:make(<<"t/opic">>, "test query") + ), + SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;", + Params = emqx_placeholder:proc_sql(Tokens, Event), + case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of + {ok, [{proc_result, 0, _Description}]} -> + ok; + {ok, [{proc_result, 6550, _Description}]} -> + %% Target table is not created + {error, undefined_table}; + Reason -> + {error, Reason} + end. to_bin(Bin) when is_binary(Bin) -> Bin; From 8430ec673ca36b004597536f13711707db41c5fb Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Wed, 17 May 2023 21:43:28 -0300 Subject: [PATCH 4/4] feat(kafka): check whether target topic exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- apps/emqx_bridge_kafka/rebar.config | 2 +- .../src/emqx_bridge_kafka_impl_producer.erl | 75 ++++++++++++++----- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 32 ++++++++ .../src/emqx_connector_mysql.erl | 11 ++- .../src/emqx_connector_pgsql.erl | 28 ++++--- apps/emqx_oracle/src/emqx_oracle.erl | 22 +++--- changes/ee/fix-10645.en.md | 1 + mix.exs | 6 +- 8 files changed, 127 insertions(+), 50 deletions(-) create mode 100644 changes/ee/fix-10645.en.md diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index cfe84a2a8..945ccbdba 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index e91cce600..be4c2d860 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -73,6 +73,12 @@ on_start(InstId, Config) -> sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, + case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of + unhealthy_target -> + throw(unhealthy_target); + _ -> + ok + end, case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -108,7 +114,9 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, producers => Producers, resource_id => ResourceId, - sync_query_timeout => SyncQueryTimeout + sync_query_timeout => SyncQueryTimeout, + hosts => Hosts, + kafka_config => KafkaConfig }}; {error, Reason2} -> ?SLOG(error, #{ @@ -131,6 +139,7 @@ on_start(InstId, Config) -> client_id => ClientId } ), + throw( "Failed to start Kafka client. Please check the logs for errors and check" " the connection parameters." @@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. -on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> +on_get_status(_InstId, #{client_id := ClientId} = State) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - do_get_status(Pid, KafkaTopic); + case do_get_status(Pid, State) of + ok -> connected; + unhealthy_target -> {disconnected, State, unhealthy_target}; + error -> connecting + end; {error, _Reason} -> connecting end. -do_get_status(Client, KafkaTopic) -> - %% TODO: add a wolff_producers:check_connectivity +do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) -> + case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of + unhealthy_target -> + unhealthy_target; + _ -> + case do_get_healthy_leaders(Client, KafkaTopic) of + [] -> error; + _ -> ok + end + end. + +do_get_healthy_leaders(Client, KafkaTopic) -> case wolff_client:get_leader_connections(Client, KafkaTopic) of {ok, Leaders} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable - case - lists:any( - fun({_Partition, Pid}) -> - is_pid(Pid) andalso erlang:is_process_alive(Pid) - end, - Leaders - ) - of - true -> - connected; - false -> - connecting - end; + %% Kafka is considered healthy as long as any of the partition leader is reachable. + lists:filtermap( + fun({_Partition, Pid}) -> + case is_pid(Pid) andalso erlang:is_process_alive(Pid) of + true -> {true, Pid}; + _ -> false + end + end, + Leaders + ); {error, _} -> - connecting + [] + end. + +do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) -> + CheckTopicFun = + fun() -> + wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) + end, + try + case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of + ok -> ok; + {error, unknown_topic_or_partition} -> unhealthy_target; + _ -> error + end + catch + _:_ -> + error end. ssl(#{enable := true} = SSL) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 6b7b961f6..6031c21cb 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) -> delete_all_bridges(), ok. +t_table_removed(_Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + KafkaTopic = "undefined-test-topic", + Conf = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( + Type, erlang:list_to_atom(Name), Conf + ), + ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name}, + ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)), + ok = emqx_bridge_resource:remove(BridgeId), + delete_all_bridges(), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 4a3ad1f3b..2931a8ec5 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -222,12 +222,11 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; + {error, {undefined_table, NState}} -> + {disconnected, NState, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> - %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + connecting end; false -> connecting @@ -251,7 +250,7 @@ do_check_prepares( {ok, Conn} -> case mysql:prepare(Conn, get_status, SQL) of {error, {1146, _, _}} -> - {undefined_table, State}; + {error, {undefined_table, State}}; {ok, Statement} -> mysql:unprepare(Conn, Statement); _ -> @@ -276,7 +275,7 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, {ok, State#{prepare_statement => Prepares}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_statement => {error, Prepares}}}; + {error, {undefined_table, State#{prepare_statement => {error, Prepares}}}}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index e0d3e834f..71d18f4a8 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -271,12 +271,12 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; - false -> - %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> + {error, {undefined_table, NState}} -> %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + {disconnected, NState, unhealthy_target}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting end; false -> connecting @@ -296,10 +296,14 @@ do_check_prepares( lists:foldl( fun (WorkerPid, ok) -> - {ok, Conn} = ecpool_worker:client(WorkerPid), - case epgsql:parse2(Conn, "get_status", SQL, []) of - {error, {_, _, _, undefined_table, _, _}} -> - {undefined_table, State}; + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case epgsql:parse2(Conn, "get_status", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {error, {undefined_table, State}}; + _ -> + ok + end; _ -> ok end; @@ -319,9 +323,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_sql => {error, Prepares}}}; - _Error -> - false + {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; + Error -> + {error, Error} end. %% =================================================================== diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 2d190057c..5a7f8d752 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -237,12 +237,12 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; + {error, {undefined_table, NState}} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> - %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + connecting end; false -> disconnected @@ -263,10 +263,14 @@ do_check_prepares( lists:foldl( fun (WorkerPid, ok) -> - {ok, Conn} = ecpool_worker:client(WorkerPid), - case check_if_table_exists(Conn, SQL, Tokens) of - {error, undefined_table} -> {undefined_table, State}; - _ -> ok + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case check_if_table_exists(Conn, SQL, Tokens) of + {error, undefined_table} -> {error, {undefined_table, State}}; + _ -> ok + end; + _ -> + ok end; (_, Acc) -> Acc @@ -283,7 +287,7 @@ do_check_prepares( {ok, State#{prepare_sql => Sts}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_sql => {error, Prepares}}}; + {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; {error, _Reason} = Error -> Error end. diff --git a/changes/ee/fix-10645.en.md b/changes/ee/fix-10645.en.md new file mode 100644 index 000000000..e97bb1c74 --- /dev/null +++ b/changes/ee/fix-10645.en.md @@ -0,0 +1 @@ +Changes health check for Oracle Database, PostgreSql, MySql and Kafka Producer data bridges to ensure target table/topic exists. diff --git a/mix.exs b/mix.exs index 89354ea92..d76cd8f60 100644 --- a/mix.exs +++ b/mix.exs @@ -95,7 +95,9 @@ defmodule EMQXUmbrella.MixProject do github: "emqx/ranch", ref: "de8ba2a00817c0a6eb1b8f20d6fb3e44e2c9a5aa", override: true}, # in conflict by grpc and eetcd {:gpb, "4.19.7", override: true, runtime: false}, - {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true} + {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}, + # set by hackney (dependency) + {:ssl_verify_fun, "1.1.6", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() @@ -194,7 +196,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},