diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3ae4e5820..3518190c7 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -80,26 +80,35 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - lists:foreach(fun (Id) -> - send_message(Id, emqx_rule_events:eventmsg_publish(Message)) - end, get_matched_bridges(Topic)); + Msg = emqx_rule_events:eventmsg_publish(Message), + send_to_egress_matched_bridges(Topic, Msg); true -> ok end, {ok, Message}. +send_to_egress_matched_bridges(Topic, Msg) -> + lists:foreach(fun (Id) -> + try send_message(Id, Msg) of + ok -> ok; + Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed", + bridge => Id, error => Error}) + catch Err:Reason:ST -> + ?SLOG(error, #{msg => "send_message_to_bridge_crash", + bridge => Id, error => Err, reason => Reason, + stacktrace => ST}) + end + end, get_matched_bridges(Topic)). + send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> - throw({bridge_not_found, BridgeId}); + {error, {bridge_not_found, BridgeId}}; #{enable := true} -> - case emqx_resource:query(ResId, {send_message, Message}) of - {error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason}); - Result -> Result - end; + emqx_resource:query(ResId, {send_message, Message}); #{enable := false} -> - throw({bridge_stopped, BridgeId}) + {error, {bridge_stopped, BridgeId}} end. config_key_path() ->