From 4e211c12d3db5e074313c6b6dcfc481f4964471d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 14 Sep 2022 16:15:10 +0800 Subject: [PATCH] fix(mqtt_bridge): return value of sending messages was discarded --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 6 ++---- .../emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl | 2 +- apps/emqx_resource/src/emqx_resource_worker.erl | 7 ++++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index f52edab15..b063d7436 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -187,8 +187,7 @@ on_stop(_InstId, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), - ok. + emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg). on_query_async( _InstId, @@ -197,8 +196,7 @@ on_query_async( #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}), - ok. + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5f81e68e1..e1651f114 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -342,7 +342,7 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, {call, From}, Req, _State) -> - {keep_state_and_data, [{reply, From, {unsuppored_request, Req}}]}; + {keep_state_and_data, [{reply, From, {error, {unsuppored_request, Req}}}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(StateName, Type, Content, #{name := Name} = State) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index f683b67ed..05292944b 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -585,7 +585,12 @@ assert_ok_result(ok) -> assert_ok_result({async_return, R}) -> assert_ok_result(R); assert_ok_result(R) when is_tuple(R) -> - ok = erlang:element(1, R); + try + ok = erlang:element(1, R) + catch + error:{badmatch, _} -> + error({not_ok_result, R}) + end; assert_ok_result(R) -> error({not_ok_result, R}).