diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 7391028ee..c1692b9af 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -220,15 +220,14 @@ send_to_matched_egress_bridges(Topic, Msg) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - send_message(BridgeType, BridgeName, ResId, Message, undefined). + send_message(BridgeType, BridgeName, ResId, Message, #{}). -send_message(BridgeType, BridgeName, ResId, Message, ReplyTo) -> +send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) -> case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of not_found -> {error, bridge_not_found}; #{enable := true} = Config -> - QueryOpts0 = query_opts(Config), - QueryOpts = QueryOpts0#{reply_to => ReplyTo}, + QueryOpts = maps:merge(query_opts(Config), QueryOpts0), emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> {error, bridge_stopped} diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 7b981b781..812d30630 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -351,7 +351,9 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} ), ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]}, - case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, ReplyTo) of + case + emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo}) + of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); Result ->