Merge pull request #9638 from lafirest/fix/mysql_dup

fix(mysql): fix the problem of data loss and bad match when mysql is disconnected
This commit is contained in:
lafirest 2023-01-02 23:15:44 +08:00 committed by GitHub
commit 7985cd3536
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 10 deletions

View File

@ -408,20 +408,34 @@ on_sql_query(
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta), ?TRACE("QUERY", "mysql_connector_received", LogMeta),
Worker = ecpool:get_client(PoolName), Worker = ecpool:get_client(PoolName),
{ok, Conn} = ecpool_worker:client(Worker), case ecpool_worker:client(Worker) of
?tp( {ok, Conn} ->
mysql_connector_send_query, ?tp(
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data} mysql_connector_send_query,
), #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
),
do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta);
{error, disconnected} ->
?SLOG(
error,
LogMeta#{
msg => "mysql_connector_do_sql_query_failed",
reason => worker_is_disconnected
}
),
{error, {recoverable_error, disconnected}}
end.
do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
{error, disconnected} = Result -> {error, disconnected} ->
?SLOG( ?SLOG(
error, error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
), ),
%% kill the poll worker to trigger reconnection %% kill the poll worker to trigger reconnection
_ = exit(Conn, restart), _ = exit(Conn, restart),
Result; {error, {recoverable_error, disconnected}};
{error, not_prepared} = Error -> {error, not_prepared} = Error ->
?tp( ?tp(
mysql_connector_prepare_query_failed, mysql_connector_prepare_query_failed,

View File

@ -10,3 +10,5 @@
## Bug Fixes ## Bug Fixes
- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641) - Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641)
- Fix the problem of data loss and bad match when the MySQL driver is disconnected [#9638](https://github.com/emqx/emqx/pull/9638).

View File

@ -10,3 +10,5 @@
## 修复 ## 修复
- 修复了测试GCP PubSub可能泄露内存的问题以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640) - 修复了测试GCP PubSub可能泄露内存的问题以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640)
- 修复 MySQL 驱动断开连接时出现的数据丢失和匹配错误的问题 [#9638](https://github.com/emqx/emqx/pull/9638)。

View File

@ -404,9 +404,13 @@ t_write_failure(Config) ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
send_message(Config, SentData) send_message(Config, SentData)
end), end),
fun(Result, _Trace) -> fun
?assertMatch({error, {resource_error, _}}, Result), ({error, {resource_error, _}}, _Trace) ->
ok ok;
({error, {recoverable_error, disconnected}}, _Trace) ->
ok;
(_, _Trace) ->
?assert(false)
end end
), ),
ok. ok.