From 6967f621d8860f6a5e461bd0cb0df6e9ca72a465 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 22 May 2023 17:24:11 +0300 Subject: [PATCH] fix(mqttconn): unify error interpretation in sync/async modes Also move this logic to the mqtt connector itself, in order to avoid dealing with extra callback layer. --- .../src/emqx_connector_mqtt.erl | 45 +++++++++++-------- .../src/mqtt/emqx_connector_mqtt_worker.erl | 24 +--------- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bd4bf6eb1..cc40b1606 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_connector_mqtt). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -behaviour(supervisor). @@ -141,12 +142,8 @@ on_stop(ResourceId, #{}) -> on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), - case emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config) of - ok -> - ok; - {error, Reason} -> - classify_error(Reason) - end. + Result = emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config), + handle_send_result(Result). on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), @@ -156,16 +153,12 @@ on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, con ok; {ok, Pid} -> {ok, Pid}; - {error, Reason} -> - classify_error(Reason) + {error, _} = Error -> + handle_send_result(Error) end. -on_async_result(Callback, ok) -> - apply_callback_function(Callback, ok); -on_async_result(Callback, {ok, _} = Ok) -> - apply_callback_function(Callback, Ok); -on_async_result(Callback, {error, Reason}) -> - apply_callback_function(Callback, classify_error(Reason)). +on_async_result(Callback, Result) -> + apply_callback_function(Callback, handle_send_result(Result)). apply_callback_function(F, Result) when is_function(F) -> erlang:apply(F, [Result]); @@ -177,16 +170,30 @@ apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list( on_get_status(_ResourceId, #{worker := Pid}) -> emqx_connector_mqtt_worker:status(Pid). +handle_send_result(ok) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> + ok; +handle_send_result({ok, Reply}) -> + {error, classify_reply(Reply)}; +handle_send_result({error, Reason}) -> + {error, classify_error(Reason)}. + +classify_reply(Reply = #{reason_code := _}) -> + {unrecoverable_error, Reply}. + classify_error(disconnected = Reason) -> - {error, {recoverable_error, Reason}}; + {recoverable_error, Reason}; classify_error({disconnected, _RC, _} = Reason) -> - {error, {recoverable_error, Reason}}; + {recoverable_error, Reason}; classify_error({shutdown, _} = Reason) -> - {error, {recoverable_error, Reason}}; + {recoverable_error, Reason}; classify_error(shutdown = Reason) -> - {error, {recoverable_error, Reason}}; + {recoverable_error, Reason}; classify_error(Reason) -> - {error, {unrecoverable_error, Reason}}. + {unrecoverable_error, Reason}. make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 -> Subscriptions; diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 4169c7f69..8e3ca3136 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -285,29 +285,7 @@ send_to_remote(Pid, MsgIn, Conf) -> do_send(Pid, export_msg(MsgIn, Conf)). do_send(Pid, Msg) when Msg /= undefined -> - case emqtt:publish(Pid, Msg) of - ok -> - ok; - {ok, #{reason_code := RC}} when - RC =:= ?RC_SUCCESS; - RC =:= ?RC_NO_MATCHING_SUBSCRIBERS - -> - ok; - {ok, #{reason_code := RC, reason_code_name := Reason}} -> - ?SLOG(warning, #{ - msg => "remote_publish_failed", - message => Msg, - reason_code => RC, - reason_code_name => Reason - }), - {error, Reason}; - {error, Reason} -> - ?SLOG(info, #{ - msg => "client_failed", - reason => Reason - }), - {error, Reason} - end; + emqtt:publish(Pid, Msg); do_send(_Name, undefined) -> ok.