diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5c62e7086..8f2d06517 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -45,6 +45,8 @@ on_get_status/2 ]). +-export([on_async_result/2]). + -behaviour(hocon_schema). -import(hoconsc, [mk/2]). @@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> classify_error(Reason) end. -on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> +on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + Callback = {fun on_async_result/2, [CallbackIn]}, case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> ok; @@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> classify_error(Reason) end. +on_async_result(Callback, ok) -> + apply_callback_function(Callback, ok); +on_async_result(Callback, {ok, _} = Ok) -> + apply_callback_function(Callback, Ok); +on_async_result(Callback, {error, Reason}) -> + apply_callback_function(Callback, classify_error(Reason)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + on_get_status(_InstId, #{name := InstanceId}) -> emqx_connector_mqtt_worker:status(InstanceId). @@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) -> {error, {recoverable_error, Reason}}; classify_error({shutdown, _} = Reason) -> {error, {recoverable_error, Reason}}; +classify_error(shutdown = Reason) -> + {error, {recoverable_error, Reason}}; classify_error(Reason) -> {error, {unrecoverable_error, Reason}}. diff --git a/changes/v5.0.17/fix-9938.en.md b/changes/v5.0.17/fix-9938.en.md new file mode 100644 index 000000000..54de76fd7 --- /dev/null +++ b/changes/v5.0.17/fix-9938.en.md @@ -0,0 +1 @@ +Report some egress MQTT bridge errors as recoverable, and thus retryable. diff --git a/changes/v5.0.17/fix-9938.zh.md b/changes/v5.0.17/fix-9938.zh.md new file mode 100644 index 000000000..f8bd41cf0 --- /dev/null +++ b/changes/v5.0.17/fix-9938.zh.md @@ -0,0 +1 @@ +将一些出口 MQTT 网桥错误报告为可恢复,因此可重试。