diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 9c40919f2..4a3ad1f3b 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -224,7 +224,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {connected, NState}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> connecting @@ -233,7 +236,37 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> do_get_status(Conn) -> ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). -do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> +do_check_prepares( + #{ + pool_name := PoolName, + prepare_statement := #{send_message := SQL} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case mysql:prepare(Conn, get_status, SQL) of + {error, {1146, _, _}} -> + {undefined_table, State}; + {ok, Statement} -> + mysql:unprepare(Conn, Statement); + _ -> + ok + end; + _ -> + ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); +do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) -> ok; do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) -> %% retry to prepare @@ -241,6 +274,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, ok -> %% remove the error {ok, State#{prepare_statement => Prepares}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_statement => {error, Prepares}}}; {error, Reason} -> {error, Reason} end. @@ -320,6 +356,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> {ok, _Key} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); + {error, {1146, _, _} = Reason} -> + %% Target table is not created + ?tp(mysql_undefined_table, #{}), + ?SLOG(error, LogMeta#{result => failed, reason => Reason}), + {error, undefined_table}; {error, Reason} -> % FIXME: we should try to differ on transient failers and % syntax failures. Retrying syntax failures is not very productive. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 4af180a2f..d8489b1b9 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -110,6 +110,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + connect_and_create_table(Config), connect_and_clear_table(Config), delete_bridge(Config), snabbkaffe:start_trace(), @@ -241,6 +242,12 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 500}). +sync_query_resource(Config, Request) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + query_resource_async(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), @@ -480,6 +487,7 @@ t_write_timeout(Config) -> ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), {ok, _} = create_bridge(Config), + connect_and_create_table(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, Timeout = 1000, @@ -641,6 +649,7 @@ t_workload_fits_prepared_statement_limit(Config) -> ). t_unprepared_statement_query(Config) -> + ok = connect_and_create_table(Config), ?assertMatch( {ok, _}, create_bridge(Config) @@ -668,6 +677,7 @@ t_unprepared_statement_query(Config) -> %% Test doesn't work with batch enabled since batch doesn't use %% prepared statements as such; it has its own query generation process t_uninitialized_prepared_statement(Config) -> + connect_and_create_table(Config), ?assertMatch( {ok, _}, create_bridge(Config) @@ -705,3 +715,64 @@ t_uninitialized_prepared_statement(Config) -> end ), ok. + +t_missing_table(Config) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?check_trace( + begin + connect_and_drop_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status == connecting orelse Status == disconnected, + emqx_resource_manager:health_check(ResourceID) + ) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, {resource_error, #{reason := unhealthy_target}}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)), + ok + end + ). + +t_table_removed(Config) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?check_trace( + begin + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, + {unrecoverable_error, + {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, + sync_query_resource(Config, {send_message, SentData, [], Timeout}) + ), + ok + end, + [] + ), + ok.