diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cffd138b5..3f067dc06 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -45,6 +45,8 @@ on_get_status/2 ]). +-export([on_async_result/2]). + -behaviour(hocon_schema). -import(hoconsc, [mk/2]). @@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> classify_error(Reason) end. -on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> +on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + Callback = {fun on_async_result/2, [CallbackIn]}, case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> ok; @@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> classify_error(Reason) end. +on_async_result(Callback, ok) -> + apply_callback_function(Callback, ok); +on_async_result(Callback, {ok, _} = Ok) -> + apply_callback_function(Callback, Ok); +on_async_result(Callback, {error, Reason}) -> + apply_callback_function(Callback, classify_error(Reason)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + on_get_status(_InstId, #{name := InstanceId}) -> emqx_connector_mqtt_worker:status(InstanceId). @@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) -> {error, {recoverable_error, Reason}}; classify_error({shutdown, _} = Reason) -> {error, {recoverable_error, Reason}}; +classify_error(shutdown = Reason) -> + {error, {recoverable_error, Reason}}; classify_error(Reason) -> {error, {unrecoverable_error, Reason}}.