From 8bd7c0eb3f64ad7bf5f89613c17f6b61aebbeb96 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 8 Feb 2023 21:02:52 +0300 Subject: [PATCH 1/2] feat(mqtt-bridge): report recoverable errors of async queries This should help to avoid delivery failures of messages which could be safely retried, in the event of intermittent connectivity loss for example. It should now be safe since 73d5592b. --- .../src/emqx_connector_mqtt.erl | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cffd138b5..3f067dc06 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -45,6 +45,8 @@ on_get_status/2 ]). +-export([on_async_result/2]). + -behaviour(hocon_schema). -import(hoconsc, [mk/2]). @@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> classify_error(Reason) end. -on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> +on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + Callback = {fun on_async_result/2, [CallbackIn]}, case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> ok; @@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> classify_error(Reason) end. +on_async_result(Callback, ok) -> + apply_callback_function(Callback, ok); +on_async_result(Callback, {ok, _} = Ok) -> + apply_callback_function(Callback, Ok); +on_async_result(Callback, {error, Reason}) -> + apply_callback_function(Callback, classify_error(Reason)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + on_get_status(_InstId, #{name := InstanceId}) -> emqx_connector_mqtt_worker:status(InstanceId). @@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) -> {error, {recoverable_error, Reason}}; classify_error({shutdown, _} = Reason) -> {error, {recoverable_error, Reason}}; +classify_error(shutdown = Reason) -> + {error, {recoverable_error, Reason}}; classify_error(Reason) -> {error, {unrecoverable_error, Reason}}. From 1b195413c32f7769be9919802f0135c062072e1a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 Feb 2023 11:47:49 +0300 Subject: [PATCH 2/2] chore: add changelog entry --- changes/v5.0.17/fix-9938.en.md | 1 + changes/v5.0.17/fix-9938.zh.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 changes/v5.0.17/fix-9938.en.md create mode 100644 changes/v5.0.17/fix-9938.zh.md diff --git a/changes/v5.0.17/fix-9938.en.md b/changes/v5.0.17/fix-9938.en.md new file mode 100644 index 000000000..54de76fd7 --- /dev/null +++ b/changes/v5.0.17/fix-9938.en.md @@ -0,0 +1 @@ +Report some egress MQTT bridge errors as recoverable, and thus retryable. diff --git a/changes/v5.0.17/fix-9938.zh.md b/changes/v5.0.17/fix-9938.zh.md new file mode 100644 index 000000000..f8bd41cf0 --- /dev/null +++ b/changes/v5.0.17/fix-9938.zh.md @@ -0,0 +1 @@ +将一些出口 MQTT 网桥错误报告为可恢复,因此可重试。