fix(mqttconn): unify error interpretation in sync/async modes

Also move this logic to the mqtt connector itself, in order to avoid
dealing with extra callback layer.
This commit is contained in:
Andrew Mayorov 2023-05-22 17:24:11 +03:00
parent 67d703f8c5
commit 6967f621d8
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 27 additions and 42 deletions

View File

@ -15,6 +15,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_mqtt). -module(emqx_connector_mqtt).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-behaviour(supervisor). -behaviour(supervisor).
@ -141,12 +142,8 @@ on_stop(ResourceId, #{}) ->
on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) -> on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
case emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config) of Result = emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config),
ok -> handle_send_result(Result).
ok;
{error, Reason} ->
classify_error(Reason)
end.
on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) -> on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
@ -156,16 +153,12 @@ on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, con
ok; ok;
{ok, Pid} -> {ok, Pid} ->
{ok, Pid}; {ok, Pid};
{error, Reason} -> {error, _} = Error ->
classify_error(Reason) handle_send_result(Error)
end. end.
on_async_result(Callback, ok) -> on_async_result(Callback, Result) ->
apply_callback_function(Callback, ok); apply_callback_function(Callback, handle_send_result(Result)).
on_async_result(Callback, {ok, _} = Ok) ->
apply_callback_function(Callback, Ok);
on_async_result(Callback, {error, Reason}) ->
apply_callback_function(Callback, classify_error(Reason)).
apply_callback_function(F, Result) when is_function(F) -> apply_callback_function(F, Result) when is_function(F) ->
erlang:apply(F, [Result]); erlang:apply(F, [Result]);
@ -177,16 +170,30 @@ apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(
on_get_status(_ResourceId, #{worker := Pid}) -> on_get_status(_ResourceId, #{worker := Pid}) ->
emqx_connector_mqtt_worker:status(Pid). emqx_connector_mqtt_worker:status(Pid).
handle_send_result(ok) ->
ok;
handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
ok;
handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
ok;
handle_send_result({ok, Reply}) ->
{error, classify_reply(Reply)};
handle_send_result({error, Reason}) ->
{error, classify_error(Reason)}.
classify_reply(Reply = #{reason_code := _}) ->
{unrecoverable_error, Reply}.
classify_error(disconnected = Reason) -> classify_error(disconnected = Reason) ->
{error, {recoverable_error, Reason}}; {recoverable_error, Reason};
classify_error({disconnected, _RC, _} = Reason) -> classify_error({disconnected, _RC, _} = Reason) ->
{error, {recoverable_error, Reason}}; {recoverable_error, Reason};
classify_error({shutdown, _} = Reason) -> classify_error({shutdown, _} = Reason) ->
{error, {recoverable_error, Reason}}; {recoverable_error, Reason};
classify_error(shutdown = Reason) -> classify_error(shutdown = Reason) ->
{error, {recoverable_error, Reason}}; {recoverable_error, Reason};
classify_error(Reason) -> classify_error(Reason) ->
{error, {unrecoverable_error, Reason}}. {unrecoverable_error, Reason}.
make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 -> make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 ->
Subscriptions; Subscriptions;

View File

@ -285,29 +285,7 @@ send_to_remote(Pid, MsgIn, Conf) ->
do_send(Pid, export_msg(MsgIn, Conf)). do_send(Pid, export_msg(MsgIn, Conf)).
do_send(Pid, Msg) when Msg /= undefined -> do_send(Pid, Msg) when Msg /= undefined ->
case emqtt:publish(Pid, Msg) of emqtt:publish(Pid, Msg);
ok ->
ok;
{ok, #{reason_code := RC}} when
RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
->
ok;
{ok, #{reason_code := RC, reason_code_name := Reason}} ->
?SLOG(warning, #{
msg => "remote_publish_failed",
message => Msg,
reason_code => RC,
reason_code_name => Reason
}),
{error, Reason};
{error, Reason} ->
?SLOG(info, #{
msg => "client_failed",
reason => Reason
}),
{error, Reason}
end;
do_send(_Name, undefined) -> do_send(_Name, undefined) ->
ok. ok.