diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 686d524dc..e52ac50b0 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -399,7 +399,12 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> case emqx_packet:check(Packet) of ok -> - emqx_external_trace:trace_process_publish(Packet, Channel, fun process_publish/2); + emqx_external_trace:trace_process_publish( + Packet, + %% More info can be added in future, but for now only clientid is used + trace_info(Channel), + fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end + ); {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; @@ -924,7 +929,7 @@ handle_deliver( NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), {ok, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel) -> - Delivers1 = emqx_external_trace:start_trace_send(Delivers, Channel), + Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)), do_handle_deliver(Delivers1, Channel). do_handle_deliver( @@ -1435,6 +1440,10 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) -> emqx_olp:backoff(Zone), ok. +trace_info(Channel) -> + %% More info can be added in future, but for now only clientid is used + maps:from_list(info([clientid], Channel)). + %%-------------------------------------------------------------------- %% Enrich MQTT Connect Info diff --git a/apps/emqx/src/emqx_external_trace.erl b/apps/emqx/src/emqx_external_trace.erl index b03e7139a..1a7df93d0 100644 --- a/apps/emqx/src/emqx_external_trace.erl +++ b/apps/emqx/src/emqx_external_trace.erl @@ -15,18 +15,20 @@ %%-------------------------------------------------------------------- -module(emqx_external_trace). --callback trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when +-callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when Packet :: emqx_types:packet(), - Channel :: emqx_channel:channel(), + ChannelInfo :: channel_info(), Res :: term(). --callback start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> +-callback start_trace_send(list(emqx_types:deliver()), channel_info()) -> list(emqx_types:deliver()). -callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. -callback event(EventName :: term(), Attributes :: term()) -> ok. +-type channel_info() :: #{atom() => _}. + -export([ provider/0, register_provider/1, @@ -38,6 +40,8 @@ event/2 ]). +-export_type([channel_info/0]). + -define(PROVIDER, {?MODULE, trace_provider}). -define(with_provider(IfRegistered, IfNotRegistered), @@ -79,17 +83,17 @@ provider() -> %% trace API %%-------------------------------------------------------------------- --spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when +-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when Packet :: emqx_types:packet(), - Channel :: emqx_channel:channel(), + ChannelInfo :: channel_info(), Res :: term(). -trace_process_publish(Packet, Channel, ProcessFun) -> - ?with_provider(?FUNCTION_NAME(Packet, Channel, ProcessFun), ProcessFun(Packet, Channel)). +trace_process_publish(Packet, ChannelInfo, ProcessFun) -> + ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)). --spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> +-spec start_trace_send(list(emqx_types:deliver()), channel_info()) -> list(emqx_types:deliver()). -start_trace_send(Delivers, Channel) -> - ?with_provider(?FUNCTION_NAME(Delivers, Channel), Delivers). +start_trace_send(Delivers, ChannelInfo) -> + ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers). -spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. end_trace_send(Packets) -> diff --git a/apps/emqx_opentelemetry/src/emqx_otel_trace.erl b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl index 061c3e983..a3c73f206 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_trace.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl @@ -94,17 +94,17 @@ stop() -> %% trace API %%-------------------------------------------------------------------- --spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when +-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when Packet :: emqx_types:packet(), - Channel :: emqx_channel:channel(), + ChannelInfo :: emqx_external_trace:channel_info(), Res :: term(). -trace_process_publish(Packet, Channel, ProcessFun) -> +trace_process_publish(Packet, ChannelInfo, ProcessFun) -> case maybe_init_ctx(Packet) of false -> - ProcessFun(Packet, Channel); + ProcessFun(Packet); RootCtx -> RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true), - Attrs = maps:merge(packet_attributes(Packet), channel_attributes(Channel)), + Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)), SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{ attributes => Attrs }), @@ -113,22 +113,22 @@ trace_process_publish(Packet, Channel, ProcessFun) -> Packet1 = put_ctx_to_packet(Ctx, Packet), _ = otel_ctx:attach(Ctx), try - ProcessFun(Packet1, Channel) + ProcessFun(Packet1) after _ = ?end_span(), clear() end end. --spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> +-spec start_trace_send(list(emqx_types:deliver()), emqx_external_trace:channel_info()) -> list(emqx_types:deliver()). -start_trace_send(Delivers, Channel) -> +start_trace_send(Delivers, ChannelInfo) -> lists:map( fun({deliver, Topic, Msg} = Deliver) -> case get_ctx_from_msg(Msg) of Ctx when is_map(Ctx) -> Attrs = maps:merge( - msg_attributes(Msg), sub_channel_attributes(Channel) + msg_attributes(Msg), sub_channel_attributes(ChannelInfo) ), StartOpts = #{attributes => Attrs}, SpanCtx = otel_tracer:start_span( @@ -216,11 +216,11 @@ msg_attributes(Msg) -> packet_attributes(#mqtt_packet{variable = Packet}) -> #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}. -channel_attributes(Channel) -> - #{'messaging.client_id' => emqx_channel:info(clientid, Channel)}. +channel_attributes(ChannelInfo) -> + #{'messaging.client_id' => maps:get(clientid, ChannelInfo, undefined)}. -sub_channel_attributes(Channel) -> - channel_attributes(Channel). +sub_channel_attributes(ChannelInfo) -> + channel_attributes(ChannelInfo). put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) -> Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};