diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 65e732d1e..6c0ff7210 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -408,20 +408,34 @@ on_sql_query( LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), Worker = ecpool:get_client(PoolName), - {ok, Conn} = ecpool_worker:client(Worker), - ?tp( - mysql_connector_send_query, - #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} - ), + case ecpool_worker:client(Worker) of + {ok, Conn} -> + ?tp( + mysql_connector_send_query, + #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} + ), + do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta); + {error, disconnected} -> + ?SLOG( + error, + LogMeta#{ + msg => "mysql_connector_do_sql_query_failed", + reason => worker_is_disconnected + } + ), + {error, {recoverable_error, disconnected}} + end. + +do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of - {error, disconnected} = Result -> + {error, disconnected} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} ), %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), - Result; + {error, {recoverable_error, disconnected}}; {error, not_prepared} = Error -> ?tp( mysql_connector_prepare_query_failed,