diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 65e732d1e..6c0ff7210 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -408,20 +408,34 @@ on_sql_query( LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), Worker = ecpool:get_client(PoolName), - {ok, Conn} = ecpool_worker:client(Worker), - ?tp( - mysql_connector_send_query, - #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} - ), + case ecpool_worker:client(Worker) of + {ok, Conn} -> + ?tp( + mysql_connector_send_query, + #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} + ), + do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta); + {error, disconnected} -> + ?SLOG( + error, + LogMeta#{ + msg => "mysql_connector_do_sql_query_failed", + reason => worker_is_disconnected + } + ), + {error, {recoverable_error, disconnected}} + end. + +do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of - {error, disconnected} = Result -> + {error, disconnected} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} ), %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), - Result; + {error, {recoverable_error, disconnected}}; {error, not_prepared} = Error -> ?tp( mysql_connector_prepare_query_failed, diff --git a/changes/v5.0.14-en.md b/changes/v5.0.14-en.md index 8e9b14e76..7ca8200b6 100644 --- a/changes/v5.0.14-en.md +++ b/changes/v5.0.14-en.md @@ -10,3 +10,5 @@ ## Bug Fixes - Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641) + +- Fix the problem of data loss and bad match when the MySQL driver is disconnected [#9638](https://github.com/emqx/emqx/pull/9638). diff --git a/changes/v5.0.14-zh.md b/changes/v5.0.14-zh.md index cbca3ad32..2667fabef 100644 --- a/changes/v5.0.14-zh.md +++ b/changes/v5.0.14-zh.md @@ -10,3 +10,5 @@ ## 修复 - 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640) + +- 修复 MySQL 驱动断开连接时出现的数据丢失和匹配错误的问题 [#9638](https://github.com/emqx/emqx/pull/9638)。 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 29fa02923..caace762a 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -404,9 +404,13 @@ t_write_failure(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> send_message(Config, SentData) end), - fun(Result, _Trace) -> - ?assertMatch({error, {resource_error, _}}, Result), - ok + fun + ({error, {resource_error, _}}, _Trace) -> + ok; + ({error, {recoverable_error, disconnected}}, _Trace) -> + ok; + (_, _Trace) -> + ?assert(false) end ), ok.