From 8bd7c0eb3f64ad7bf5f89613c17f6b61aebbeb96 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 8 Feb 2023 21:02:52 +0300 Subject: [PATCH] feat(mqtt-bridge): report recoverable errors of async queries This should help to avoid delivery failures of messages which could be safely retried, in the event of intermittent connectivity loss for example. It should now be safe since 73d5592b. --- .../src/emqx_connector_mqtt.erl | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cffd138b5..3f067dc06 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -45,6 +45,8 @@ on_get_status/2 ]). +-export([on_async_result/2]). + -behaviour(hocon_schema). -import(hoconsc, [mk/2]). @@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> classify_error(Reason) end. -on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> +on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + Callback = {fun on_async_result/2, [CallbackIn]}, case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> ok; @@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> classify_error(Reason) end. +on_async_result(Callback, ok) -> + apply_callback_function(Callback, ok); +on_async_result(Callback, {ok, _} = Ok) -> + apply_callback_function(Callback, Ok); +on_async_result(Callback, {error, Reason}) -> + apply_callback_function(Callback, classify_error(Reason)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + on_get_status(_InstId, #{name := InstanceId}) -> emqx_connector_mqtt_worker:status(InstanceId). @@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) -> {error, {recoverable_error, Reason}}; classify_error({shutdown, _} = Reason) -> {error, {recoverable_error, Reason}}; +classify_error(shutdown = Reason) -> + {error, {recoverable_error, Reason}}; classify_error(Reason) -> {error, {unrecoverable_error, Reason}}.