feat(mqtt-bridge): report recoverable errors of async queries

This should help to avoid delivery failures of messages which could
be safely retried, in the event of intermittent connectivity loss
for example.

It should now be safe since 73d5592b.
This commit is contained in:
Andrew Mayorov 2023-02-08 21:02:52 +03:00
parent d1eb788ee1
commit 8bd7c0eb3f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 20 additions and 1 deletions

View File

@ -45,6 +45,8 @@
on_get_status/2 on_get_status/2
]). ]).
-export([on_async_result/2]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
-import(hoconsc, [mk/2]). -import(hoconsc, [mk/2]).
@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
classify_error(Reason) classify_error(Reason)
end. end.
on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
Callback = {fun on_async_result/2, [CallbackIn]},
case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
ok -> ok ->
ok; ok;
@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
classify_error(Reason) classify_error(Reason)
end. end.
on_async_result(Callback, ok) ->
apply_callback_function(Callback, ok);
on_async_result(Callback, {ok, _} = Ok) ->
apply_callback_function(Callback, Ok);
on_async_result(Callback, {error, Reason}) ->
apply_callback_function(Callback, classify_error(Reason)).
apply_callback_function(F, Result) when is_function(F) ->
erlang:apply(F, [Result]);
apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
erlang:apply(F, A ++ [Result]);
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
erlang:apply(M, F, A ++ [Result]).
on_get_status(_InstId, #{name := InstanceId}) -> on_get_status(_InstId, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:status(InstanceId). emqx_connector_mqtt_worker:status(InstanceId).
@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) ->
{error, {recoverable_error, Reason}}; {error, {recoverable_error, Reason}};
classify_error({shutdown, _} = Reason) -> classify_error({shutdown, _} = Reason) ->
{error, {recoverable_error, Reason}}; {error, {recoverable_error, Reason}};
classify_error(shutdown = Reason) ->
{error, {recoverable_error, Reason}};
classify_error(Reason) -> classify_error(Reason) ->
{error, {unrecoverable_error, Reason}}. {error, {unrecoverable_error, Reason}}.