feat: add stop after render and after render trace to rabbitmq action

This commit is contained in:
Kjell Winblad 2024-04-24 16:38:35 +02:00
parent 11d9d30fc0
commit d6ceeb3b30
1 changed files with 55 additions and 10 deletions

View File

@ -41,7 +41,7 @@
-export([connect/1]). -export([connect/1]).
%% Internal callbacks %% Internal callbacks
-export([publish_messages/4]). -export([publish_messages/5]).
namespace() -> "rabbitmq". namespace() -> "rabbitmq".
@ -214,9 +214,12 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->
#{channels := Channels} = State, #{channels := Channels} = State,
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
Res = ecpool:pick_and_do( Res = ecpool:pick_and_do(
ResourceID, ResourceID,
{?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]}, {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]},
no_handover no_handover
), ),
handle_result(Res); handle_result(Res);
@ -234,9 +237,12 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
#{channels := Channels} = State, #{channels := Channels} = State,
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
Res = ecpool:pick_and_do( Res = ecpool:pick_and_do(
ResourceID, ResourceID,
{?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]}, {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]},
no_handover no_handover
), ),
handle_result(Res); handle_result(Res);
@ -244,6 +250,22 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
{error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
end. end.
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
publish_messages( publish_messages(
Conn, Conn,
RabbitMQ, RabbitMQ,
@ -255,7 +277,8 @@ publish_messages(
wait_for_publish_confirmations := WaitForPublishConfirmations, wait_for_publish_confirmations := WaitForPublishConfirmations,
publish_confirmation_timeout := PublishConfirmationTimeout publish_confirmation_timeout := PublishConfirmationTimeout
}, },
Messages Messages,
TraceRenderedFunc
) -> ) ->
try try
publish_messages( publish_messages(
@ -267,15 +290,18 @@ publish_messages(
PayloadTmpl, PayloadTmpl,
Messages, Messages,
WaitForPublishConfirmations, WaitForPublishConfirmations,
PublishConfirmationTimeout PublishConfirmationTimeout,
TraceRenderedFunc
) )
catch catch
error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason ->
{error, Reason};
%% if send a message to a non-existent exchange, RabbitMQ client will crash %% if send a message to a non-existent exchange, RabbitMQ client will crash
%% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>}
%% so we catch and return {recoverable_error, Reason} to increase metrics %% so we catch and return {recoverable_error, Reason} to increase metrics
_Type:Reason -> _Type:Reason ->
Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])), Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])),
erlang:error({recoverable_error, Msg}) {error, {recoverable_error, Msg}}
end. end.
publish_messages( publish_messages(
@ -287,7 +313,8 @@ publish_messages(
PayloadTmpl, PayloadTmpl,
Messages, Messages,
WaitForPublishConfirmations, WaitForPublishConfirmations,
PublishConfirmationTimeout PublishConfirmationTimeout,
{TraceRenderedFun, TraceRenderedFuncCTX}
) -> ) ->
case maps:find(Conn, RabbitMQ) of case maps:find(Conn, RabbitMQ) of
{ok, Channel} -> {ok, Channel} ->
@ -299,18 +326,36 @@ publish_messages(
exchange = Exchange, exchange = Exchange,
routing_key = RoutingKey routing_key = RoutingKey
}, },
FormattedMsgs = [
format_data(PayloadTmpl, M)
|| {_, M} <- Messages
],
TraceRenderedFun(
#{
messages => FormattedMsgs,
properties => #{
headers => [],
delivery_mode => DeliveryMode
},
method => #{
exchange => Exchange,
routing_key => RoutingKey
}
},
TraceRenderedFuncCTX
),
lists:foreach( lists:foreach(
fun({_, MsgRaw}) -> fun(Msg) ->
amqp_channel:cast( amqp_channel:cast(
Channel, Channel,
Method, Method,
#amqp_msg{ #amqp_msg{
payload = format_data(PayloadTmpl, MsgRaw), payload = Msg,
props = MessageProperties props = MessageProperties
} }
) )
end, end,
Messages FormattedMsgs
), ),
case WaitForPublishConfirmations of case WaitForPublishConfirmations of
true -> true ->