diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 900f6143f..d61950513 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -264,7 +264,7 @@ on_query( ), Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig])); + handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig])); on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", @@ -283,7 +283,7 @@ on_query_async( Callback = {fun on_async_result/2, [CallbackIn]}, Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]), + Result = with_egress_client(ChannelId, PoolName, send_async, [Msg, Callback, ChannelConfig]), case Result of ok -> ok; @@ -300,8 +300,25 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> reason => "Egress is not configured" }). -with_egress_client(ResourceId, Fun, Args) -> - ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). +with_egress_client(ActionID, ResourceId, Fun, Args) -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + ecpool:pick_and_do( + ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover + ). + +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = logger:get_process_metadata(), + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -322,7 +339,9 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> handle_send_result({ok, Reply}) -> {error, classify_reply(Reply)}; handle_send_result({error, Reason}) -> - {error, classify_error(Reason)}. + {error, classify_error(Reason)}; +handle_send_result({unrecoverable_error, Reason}) -> + {error, {unrecoverable_error, Reason}}. classify_reply(Reply = #{reason_code := _}) -> {unrecoverable_error, Reply}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index d23899ef1..80d38bc78 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -22,13 +22,16 @@ -export([ config/1, - send/3, - send_async/4 + send/4, + send_async/5 ]). -type message() :: emqx_types:message() | map(). -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type remote_message() :: #mqtt_msg{}. +-type trace_rendered_func() :: { + fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map() +}. -type egress() :: #{ local => #{ @@ -42,25 +45,37 @@ config(#{remote := RC = #{}} = Conf) -> Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. --spec send(pid(), message(), egress()) -> ok. -send(Pid, MsgIn, Egress) -> - emqtt:publish(Pid, export_msg(MsgIn, Egress)). +-spec send(pid(), trace_rendered_func(), message(), egress()) -> ok. +send(Pid, TraceRenderedFunc, MsgIn, Egress) -> + try + emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc)) + catch + error:{unrecoverable_error, Reason} -> + {unrecoverable_error, Reason} + end. --spec send_async(pid(), message(), callback(), egress()) -> +-spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) -> ok | {ok, pid()}. -send_async(Pid, MsgIn, Callback, Egress) -> - ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), - {ok, Pid}. +send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) -> + try + ok = emqtt:publish_async( + Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback + ), + {ok, Pid} + catch + error:{unrecoverable_error, Reason} -> + {unrecoverable_error, Reason} + end. -export_msg(Msg, #{remote := Remote}) -> - to_remote_msg(Msg, Remote). +export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) -> + to_remote_msg(Msg, Remote, TraceRenderedFunc). --spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> +-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) -> remote_message(). -to_remote_msg(#message{flags = Flags} = Msg, Vars) -> +to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); -to_remote_msg(Msg = #{}, Remote) -> + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc); +to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> #{ topic := Topic, payload := Payload, @@ -68,6 +83,16 @@ to_remote_msg(Msg = #{}, Remote) -> retain := Retain } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), + TraceRenderedFun( + #{ + qos => QoS, + retain => Retain, + topic => Topic, + props => PubProps, + payload => Payload + }, + TraceRenderedCTX + ), #mqtt_msg{ qos = QoS, retain = Retain,