refactor(emqx_opentelemetry): pass only channel info to trace functions

This commit is contained in:
Serge Tupchii 2023-12-05 19:01:08 +02:00
parent 195a23ae27
commit 938508b270
3 changed files with 38 additions and 25 deletions

View File

@ -399,7 +399,12 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> ok ->
emqx_external_trace:trace_process_publish(Packet, Channel, fun process_publish/2); emqx_external_trace:trace_process_publish(
Packet,
%% More info can be added in future, but for now only clientid is used
trace_info(Channel),
fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel) handle_out(disconnect, ReasonCode, Channel)
end; end;
@ -924,7 +929,7 @@ handle_deliver(
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
{ok, Channel#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel) -> handle_deliver(Delivers, Channel) ->
Delivers1 = emqx_external_trace:start_trace_send(Delivers, Channel), Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
do_handle_deliver(Delivers1, Channel). do_handle_deliver(Delivers1, Channel).
do_handle_deliver( do_handle_deliver(
@ -1435,6 +1440,10 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
emqx_olp:backoff(Zone), emqx_olp:backoff(Zone),
ok. ok.
trace_info(Channel) ->
%% More info can be added in future, but for now only clientid is used
maps:from_list(info([clientid], Channel)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enrich MQTT Connect Info %% Enrich MQTT Connect Info

View File

@ -15,18 +15,20 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_external_trace). -module(emqx_external_trace).
-callback trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when -callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
Packet :: emqx_types:packet(), Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(), ChannelInfo :: channel_info(),
Res :: term(). Res :: term().
-callback start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> -callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
list(emqx_types:deliver()). list(emqx_types:deliver()).
-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. -callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
-callback event(EventName :: term(), Attributes :: term()) -> ok. -callback event(EventName :: term(), Attributes :: term()) -> ok.
-type channel_info() :: #{atom() => _}.
-export([ -export([
provider/0, provider/0,
register_provider/1, register_provider/1,
@ -38,6 +40,8 @@
event/2 event/2
]). ]).
-export_type([channel_info/0]).
-define(PROVIDER, {?MODULE, trace_provider}). -define(PROVIDER, {?MODULE, trace_provider}).
-define(with_provider(IfRegistered, IfNotRegistered), -define(with_provider(IfRegistered, IfNotRegistered),
@ -79,17 +83,17 @@ provider() ->
%% trace API %% trace API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when -spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
Packet :: emqx_types:packet(), Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(), ChannelInfo :: channel_info(),
Res :: term(). Res :: term().
trace_process_publish(Packet, Channel, ProcessFun) -> trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
?with_provider(?FUNCTION_NAME(Packet, Channel, ProcessFun), ProcessFun(Packet, Channel)). ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)).
-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> -spec start_trace_send(list(emqx_types:deliver()), channel_info()) ->
list(emqx_types:deliver()). list(emqx_types:deliver()).
start_trace_send(Delivers, Channel) -> start_trace_send(Delivers, ChannelInfo) ->
?with_provider(?FUNCTION_NAME(Delivers, Channel), Delivers). ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers).
-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. -spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
end_trace_send(Packets) -> end_trace_send(Packets) ->

View File

@ -94,17 +94,17 @@ stop() ->
%% trace API %% trace API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when -spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
Packet :: emqx_types:packet(), Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(), ChannelInfo :: emqx_external_trace:channel_info(),
Res :: term(). Res :: term().
trace_process_publish(Packet, Channel, ProcessFun) -> trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
case maybe_init_ctx(Packet) of case maybe_init_ctx(Packet) of
false -> false ->
ProcessFun(Packet, Channel); ProcessFun(Packet);
RootCtx -> RootCtx ->
RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true), RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true),
Attrs = maps:merge(packet_attributes(Packet), channel_attributes(Channel)), Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)),
SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{ SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{
attributes => Attrs attributes => Attrs
}), }),
@ -113,22 +113,22 @@ trace_process_publish(Packet, Channel, ProcessFun) ->
Packet1 = put_ctx_to_packet(Ctx, Packet), Packet1 = put_ctx_to_packet(Ctx, Packet),
_ = otel_ctx:attach(Ctx), _ = otel_ctx:attach(Ctx),
try try
ProcessFun(Packet1, Channel) ProcessFun(Packet1)
after after
_ = ?end_span(), _ = ?end_span(),
clear() clear()
end end
end. end.
-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> -spec start_trace_send(list(emqx_types:deliver()), emqx_external_trace:channel_info()) ->
list(emqx_types:deliver()). list(emqx_types:deliver()).
start_trace_send(Delivers, Channel) -> start_trace_send(Delivers, ChannelInfo) ->
lists:map( lists:map(
fun({deliver, Topic, Msg} = Deliver) -> fun({deliver, Topic, Msg} = Deliver) ->
case get_ctx_from_msg(Msg) of case get_ctx_from_msg(Msg) of
Ctx when is_map(Ctx) -> Ctx when is_map(Ctx) ->
Attrs = maps:merge( Attrs = maps:merge(
msg_attributes(Msg), sub_channel_attributes(Channel) msg_attributes(Msg), sub_channel_attributes(ChannelInfo)
), ),
StartOpts = #{attributes => Attrs}, StartOpts = #{attributes => Attrs},
SpanCtx = otel_tracer:start_span( SpanCtx = otel_tracer:start_span(
@ -216,11 +216,11 @@ msg_attributes(Msg) ->
packet_attributes(#mqtt_packet{variable = Packet}) -> packet_attributes(#mqtt_packet{variable = Packet}) ->
#{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}. #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}.
channel_attributes(Channel) -> channel_attributes(ChannelInfo) ->
#{'messaging.client_id' => emqx_channel:info(clientid, Channel)}. #{'messaging.client_id' => maps:get(clientid, ChannelInfo, undefined)}.
sub_channel_attributes(Channel) -> sub_channel_attributes(ChannelInfo) ->
channel_attributes(Channel). channel_attributes(ChannelInfo).
put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) -> put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) ->
Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}}; Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};