From d6ceeb3b30f099378525cd6dca3d813dde4b4c55 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 16:38:35 +0200 Subject: [PATCH] feat: add stop after render and after render trace to rabbitmq action --- .../src/emqx_bridge_rabbitmq_connector.erl | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 1ef1c6617..1637743b5 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -41,7 +41,7 @@ -export([connect/1]). %% Internal callbacks --export([publish_messages/4]). +-export([publish_messages/5]). namespace() -> "rabbitmq". @@ -214,9 +214,12 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]}, no_handover ), handle_result(Res); @@ -234,9 +237,12 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]}, no_handover ), handle_result(Res); @@ -244,6 +250,22 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} end. +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + publish_messages( Conn, RabbitMQ, @@ -255,7 +277,8 @@ publish_messages( wait_for_publish_confirmations := WaitForPublishConfirmations, publish_confirmation_timeout := PublishConfirmationTimeout }, - Messages + Messages, + TraceRenderedFunc ) -> try publish_messages( @@ -267,15 +290,18 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + TraceRenderedFunc ) catch + error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + {error, Reason}; %% if send a message to a non-existent exchange, RabbitMQ client will crash %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} %% so we catch and return {recoverable_error, Reason} to increase metrics _Type:Reason -> Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])), - erlang:error({recoverable_error, Msg}) + {error, {recoverable_error, Msg}} end. publish_messages( @@ -287,7 +313,8 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + {TraceRenderedFun, TraceRenderedFuncCTX} ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> @@ -299,18 +326,36 @@ publish_messages( exchange = Exchange, routing_key = RoutingKey }, + FormattedMsgs = [ + format_data(PayloadTmpl, M) + || {_, M} <- Messages + ], + TraceRenderedFun( + #{ + messages => FormattedMsgs, + properties => #{ + headers => [], + delivery_mode => DeliveryMode + }, + method => #{ + exchange => Exchange, + routing_key => RoutingKey + } + }, + TraceRenderedFuncCTX + ), lists:foreach( - fun({_, MsgRaw}) -> + fun(Msg) -> amqp_channel:cast( Channel, Method, #amqp_msg{ - payload = format_data(PayloadTmpl, MsgRaw), + payload = Msg, props = MessageProperties } ) end, - Messages + FormattedMsgs ), case WaitForPublishConfirmations of true ->