diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 8c39c3671..0cddfab66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -196,6 +196,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> {error, channel_not_found}; {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + sync_timeout => SyncTimeout, + is_async => false + }), try pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) catch @@ -217,12 +222,16 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> ?tp_span( pulsar_producer_on_query_async, #{instance_id => _InstanceId, message => Message}, - on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) + on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. -on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) -> +on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + is_async => true + }), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). %%-------------------------------------------------------------------------------------