From db5cdb674242164a6abc2562d21808d1ada12e45 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 31 Oct 2023 16:46:36 +0100 Subject: [PATCH] perf: no need to format event message if no bridge matched --- apps/emqx_bridge/src/emqx_bridge.erl | 74 +++++++++++++++------------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a3e06a819..8098072c0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -191,45 +191,50 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - {Msg, _} = emqx_rule_events:eventmsg_publish(Message), - send_to_matched_egress_bridges(Topic, Msg); + send_to_matched_egress_bridges(Topic, Message); true -> ok end, {ok, Message}. -send_to_matched_egress_bridges(Topic, Msg) -> - MatchedBridgeIds = get_matched_egress_bridges(Topic), - lists:foreach( - fun(Id) -> - try send_message(Id, Msg) of - {error, Reason} -> - ?SLOG(error, #{ - msg => "send_message_to_bridge_failed", - bridge => Id, - error => Reason - }); - _ -> - ok - catch - throw:Reason -> - ?SLOG(error, #{ - msg => "send_message_to_bridge_exception", - bridge => Id, - reason => emqx_utils:redact(Reason) - }); - Err:Reason:ST -> - ?SLOG(error, #{ - msg => "send_message_to_bridge_exception", - bridge => Id, - error => Err, - reason => emqx_utils:redact(Reason), - stacktrace => emqx_utils:redact(ST) - }) - end - end, - MatchedBridgeIds - ). +send_to_matched_egress_bridges(Topic, Message) -> + case get_matched_egress_bridges(Topic) of + [] -> + ok; + Ids -> + {Msg, _} = emqx_rule_events:eventmsg_publish(Message), + send_to_matched_egress_bridges_loop(Topic, Msg, Ids) + end. + +send_to_matched_egress_bridges_loop(_Topic, _Msg, []) -> + ok; +send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) -> + try send_message(Id, Msg) of + {error, Reason} -> + ?SLOG(error, #{ + msg => "send_message_to_bridge_failed", + bridge => Id, + error => Reason + }); + _ -> + ok + catch + throw:Reason -> + ?SLOG(error, #{ + msg => "send_message_to_bridge_exception", + bridge => Id, + reason => emqx_utils:redact(Reason) + }); + Err:Reason:ST -> + ?SLOG(error, #{ + msg => "send_message_to_bridge_exception", + bridge => Id, + error => Err, + reason => emqx_utils:redact(Reason), + stacktrace => emqx_utils:redact(ST) + }) + end, + send_to_matched_egress_bridges_loop(Topic, Msg, Ids). send_message(BridgeId, Message) -> {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), @@ -571,6 +576,7 @@ flatten_confs(Conf0) -> do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. +%% TODO: create a topic index for this get_matched_egress_bridges(Topic) -> Bridges = emqx:get_config([bridges], #{}), maps:fold(