feat: integrate OpenTelemetry traces

This commit is contained in:
Serge Tupchii 2023-11-20 17:05:10 +02:00
parent 89732cb4e4
commit 7fdc650448
14 changed files with 566 additions and 32 deletions

View File

@ -398,8 +398,10 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> process_publish(Packet, Channel); ok ->
{error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) emqx_external_trace:trace_process_publish(Packet, Channel, fun process_publish/2);
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in( handle_in(
?PUBACK_PACKET(PacketId, _ReasonCode, Properties), ?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
@ -921,7 +923,11 @@ handle_deliver(
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
{ok, Channel#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
handle_deliver( handle_deliver(Delivers, Channel) ->
Delivers1 = emqx_external_trace:start_trace_send(Delivers, Channel),
do_handle_deliver(Delivers1, Channel).
do_handle_deliver(
Delivers, Delivers,
Channel = #channel{ Channel = #channel{
session = Session, session = Session,

View File

@ -855,9 +855,14 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packets %% Handle outgoing packets
handle_outgoing(Packets, State) when is_list(Packets) -> handle_outgoing(Packets, State) ->
Res = do_handle_outgoing(Packets, State),
emqx_external_trace:end_trace_send(Packets),
Res.
do_handle_outgoing(Packets, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
handle_outgoing(Packet, State) -> do_handle_outgoing(Packet, State) ->
send((serialize_and_inc_stats_fun(State))(Packet), State). send((serialize_and_inc_stats_fun(State))(Packet), State).
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->

View File

@ -0,0 +1,109 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_external_trace).
-callback trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when
Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(),
Res :: term().
-callback start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) ->
list(emqx_types:deliver()).
-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
-callback event(EventName :: term(), Attributes :: term()) -> ok.
-export([
register_provider/1,
unregister_provider/1,
trace_process_publish/3,
start_trace_send/2,
end_trace_send/1,
event/1,
event/2
]).
-define(PROVIDER, {?MODULE, trace_provider}).
-define(with_provider(IfRegisitered, IfNotRegisired),
case persistent_term:get(?PROVIDER, undefined) of
undefined ->
IfNotRegisired;
Provider ->
Provider:IfRegisitered
end
).
%%--------------------------------------------------------------------
%% provider API
%%--------------------------------------------------------------------
-spec register_provider(module()) -> ok | {error, term()}.
register_provider(Module) when is_atom(Module) ->
case is_valid_provider(Module) of
true ->
persistent_term:put(?PROVIDER, Module);
false ->
{error, invalid_provider}
end.
-spec unregister_provider(module()) -> ok | {error, term()}.
unregister_provider(Module) ->
case persistent_term:get(?PROVIDER, undefined) of
Module ->
persistent_term:erase(?PROVIDER),
ok;
_ ->
{error, not_registered}
end.
%%--------------------------------------------------------------------
%% trace API
%%--------------------------------------------------------------------
-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when
Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(),
Res :: term().
trace_process_publish(Packet, Channel, ProcessFun) ->
?with_provider(?FUNCTION_NAME(Packet, Channel, ProcessFun), ProcessFun(Packet, Channel)).
-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) ->
list(emqx_types:deliver()).
start_trace_send(Delivers, Channel) ->
?with_provider(?FUNCTION_NAME(Delivers, Channel), Delivers).
-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
end_trace_send(Packets) ->
?with_provider(?FUNCTION_NAME(Packets), ok).
event(Name) ->
event(Name, #{}).
-spec event(term(), term()) -> ok.
event(Name, Attributes) ->
?with_provider(?FUNCTION_NAME(Name, Attributes), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
is_valid_provider(Module) ->
lists:all(
fun({F, A}) -> erlang:function_exported(Module, F, A) end,
?MODULE:behaviour_info(callbacks)
).

View File

@ -960,6 +960,8 @@ serialize_properties(Props) when is_map(Props) ->
serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined -> serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined ->
<<>>; <<>>;
serialize_property(internal_extra, _) ->
<<>>;
serialize_property('Payload-Format-Indicator', Val) -> serialize_property('Payload-Format-Indicator', Val) ->
<<16#01, Val>>; <<16#01, Val>>;
serialize_property('Message-Expiry-Interval', Val) -> serialize_property('Message-Expiry-Interval', Val) ->

View File

@ -311,7 +311,8 @@ to_packet(
qos = QoS, qos = QoS,
headers = Headers, headers = Headers,
topic = Topic, topic = Topic,
payload = Payload payload = Payload,
extra = Extra
} }
) -> ) ->
#mqtt_packet{ #mqtt_packet{
@ -324,8 +325,8 @@ to_packet(
variable = #mqtt_packet_publish{ variable = #mqtt_packet_publish{
topic_name = Topic, topic_name = Topic,
packet_id = PacketId, packet_id = PacketId,
properties = filter_pub_props( properties = maybe_put_extra(
maps:get(properties, Headers, #{}) Extra, filter_pub_props(maps:get(properties, Headers, #{}))
) )
}, },
payload = Payload payload = Payload
@ -345,6 +346,11 @@ filter_pub_props(Props) ->
Props Props
). ).
maybe_put_extra(Extra, Props) when map_size(Extra) > 0 ->
Props#{internal_extra => Extra};
maybe_put_extra(_Extra, Props) ->
Props.
%% @doc Message to map %% @doc Message to map
-spec to_map(emqx_types:message()) -> message_map(). -spec to_map(emqx_types:message()) -> message_map().
to_map(#message{ to_map(#message{

View File

@ -452,9 +452,15 @@ to_message(
Headers Headers
) -> ) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
{Extra, Props1} =
case maps:take(internal_extra, Props) of
error -> {#{}, Props};
ExtraProps -> ExtraProps
end,
Msg#message{ Msg#message{
flags = #{dup => Dup, retain => Retain}, flags = #{dup => Dup, retain => Retain},
headers = Headers#{properties => Props} headers = Headers#{properties => Props1},
extra = Extra
}. }.
-spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message(). -spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message().

View File

@ -207,7 +207,7 @@ t_to_map(_) ->
{topic, <<"topic">>}, {topic, <<"topic">>},
{payload, <<"payload">>}, {payload, <<"payload">>},
{timestamp, emqx_message:timestamp(Msg)}, {timestamp, emqx_message:timestamp(Msg)},
{extra, []} {extra, #{}}
], ],
?assertEqual(List, emqx_message:to_list(Msg)), ?assertEqual(List, emqx_message:to_list(Msg)),
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
@ -223,7 +223,7 @@ t_from_map(_) ->
topic => <<"topic">>, topic => <<"topic">>,
payload => <<"payload">>, payload => <<"payload">>,
timestamp => emqx_message:timestamp(Msg), timestamp => emqx_message:timestamp(Msg),
extra => [] extra => #{}
}, },
?assertEqual(Map, emqx_message:to_map(Msg)), ?assertEqual(Map, emqx_message:to_map(Msg)),
?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))). ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).

View File

@ -24,10 +24,12 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
emqx_otel_config:add_handler(), emqx_otel_config:add_handler(),
ok = emqx_otel_config:add_otel_log_handler(), ok = emqx_otel_config:add_otel_log_handler(),
ok = emqx_otel_trace:ensure_traces(emqx:get_config([opentelemetry, traces])),
emqx_otel_sup:start_link(). emqx_otel_sup:start_link().
stop(_State) -> stop(_State) ->
emqx_otel_config:remove_handler(), emqx_otel_config:remove_handler(),
_ = emqx_otel_trace:stop(),
_ = emqx_otel_config:remove_otel_log_handler(), _ = emqx_otel_config:remove_otel_log_handler(),
ok. ok.

View File

@ -54,27 +54,24 @@ remove_handler() ->
post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) -> post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) ->
ok; ok;
post_config_update(?OPTL, _Req, New, _Old, AppEnvs) -> post_config_update(?OPTL, _Req, New, Old, AppEnvs) ->
application:set_env(AppEnvs), application:set_env(AppEnvs),
MetricsRes = ensure_otel_metrics(New), MetricsRes = ensure_otel_metrics(New, Old),
LogsRes = ensure_otel_logs(New), LogsRes = ensure_otel_logs(New, Old),
TracesRes = ensure_otel_traces(New, Old),
_ = maybe_stop_all_otel_apps(New), _ = maybe_stop_all_otel_apps(New),
case {MetricsRes, LogsRes} of case {MetricsRes, LogsRes, TracesRes} of
{ok, ok} -> ok; {ok, ok, ok} -> ok;
Other -> {error, Other} Other -> {error, Other}
end; end;
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok. ok.
stop_all_otel_apps() -> stop_all_otel_apps() ->
_ = application:stop(opentelemetry), stop_all_otel_apps(true).
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
ok.
add_otel_log_handler() -> add_otel_log_handler() ->
ensure_otel_logs(emqx:get_config(?OPTL)). ensure_otel_logs(emqx:get_config(?OPTL), #{}).
remove_otel_log_handler() -> remove_otel_log_handler() ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID). remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
@ -93,23 +90,34 @@ otel_exporter(ExporterConf) ->
%% Internal functions %% Internal functions
ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) -> ensure_otel_metrics(#{metrics := MetricsConf}, #{metrics := MetricsConf}) ->
ok;
ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}, _Old) ->
_ = emqx_otel_metrics:stop_otel(), _ = emqx_otel_metrics:stop_otel(),
emqx_otel_metrics:start_otel(MetricsConf); emqx_otel_metrics:start_otel(MetricsConf);
ensure_otel_metrics(#{metrics := #{enable := false}}) -> ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) ->
emqx_otel_metrics:stop_otel(); emqx_otel_metrics:stop_otel();
ensure_otel_metrics(_) -> ensure_otel_metrics(_, _) ->
ok. ok.
ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) -> ensure_otel_logs(#{logs := LogsConf}, #{logs := LogsConf}) ->
ok;
ensure_otel_logs(#{logs := #{enable := true} = LogsConf}, _OldConf) ->
ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID), ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID),
ok = ensure_log_apps(), ok = ensure_log_apps(),
HandlerConf = tr_handler_conf(LogsConf), HandlerConf = tr_handler_conf(LogsConf),
%% NOTE: should primary logger level be updated if it's higher than otel log level? %% NOTE: should primary logger level be updated if it's higher than otel log level?
logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf); logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf);
ensure_otel_logs(#{logs := #{enable := false}}) -> ensure_otel_logs(#{logs := #{enable := false}}, _OldConf) ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID). remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
ensure_otel_traces(#{traces := TracesConf}, #{traces := TracesConf}) ->
ok;
ensure_otel_traces(#{traces := #{enable := true} = TracesConf}, _OldConf) ->
emqx_otel_trace:start(TracesConf);
ensure_otel_traces(#{traces := #{enable := false}}, _OldConf) ->
emqx_otel_trace:stop().
remove_handler_if_present(HandlerId) -> remove_handler_if_present(HandlerId) ->
case logger:get_handler_config(HandlerId) of case logger:get_handler_config(HandlerId) of
{ok, _} -> {ok, _} ->
@ -123,8 +131,13 @@ ensure_log_apps() ->
{ok, _} = application:ensure_all_started(opentelemetry_experimental), {ok, _} = application:ensure_all_started(opentelemetry_experimental),
ok. ok.
maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) -> maybe_stop_all_otel_apps(#{
stop_all_otel_apps(); metrics := #{enable := false},
logs := #{enable := false},
traces := #{enable := false}
}) ->
IsShutdown = false,
stop_all_otel_apps(IsShutdown);
maybe_stop_all_otel_apps(_) -> maybe_stop_all_otel_apps(_) ->
ok. ok.
@ -158,3 +171,18 @@ is_ssl(<<"https://", _/binary>> = _Endpoint) ->
true; true;
is_ssl(_Endpoint) -> is_ssl(_Endpoint) ->
false. false.
stop_all_otel_apps(IsShutdown) ->
%% if traces were enabled, it's not safe to stop opentelemetry app,
%% as there could be not finsihed traces that would crash if spans ETS tables are deleted
_ =
case IsShutdown of
true ->
_ = application:stop(opentelemetry);
false ->
ok
end,
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
ok.

View File

@ -62,6 +62,13 @@ fields("opentelemetry") ->
#{ #{
desc => ?DESC(otel_logs) desc => ?DESC(otel_logs)
} }
)},
{traces,
?HOCON(
?R_REF("otel_traces"),
#{
desc => ?DESC(otel_traces)
}
)} )}
]; ];
fields("otel_metrics") -> fields("otel_metrics") ->
@ -137,21 +144,94 @@ fields("otel_logs") ->
} }
)} )}
]; ];
fields("otel_traces") ->
[
{enable,
?HOCON(
boolean(),
#{
default => false,
desc => ?DESC(enable),
importance => ?IMPORTANCE_HIGH
}
)},
{max_queue_size,
?HOCON(
pos_integer(),
#{
default => 2048,
desc => ?DESC(max_queue_size),
importance => ?IMPORTANCE_HIDDEN
}
)},
{exporting_timeout,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"30s">>,
desc => ?DESC(exporting_timeout),
importance => ?IMPORTANCE_HIDDEN
}
)},
{scheduled_delay,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC(scheduled_delay),
importance => ?IMPORTANCE_HIDDEN
}
)},
{exporter,
?HOCON(
?R_REF("otel_traces_exporter"),
#{
desc => ?DESC(exporter),
importance => ?IMPORTANCE_HIGH
}
)},
{filter,
?HOCON(
?R_REF("trace_filter"),
#{
desc => ?DESC(trace_filter),
importance => ?IMPORTANCE_MEDIUM
}
)}
];
fields("otel_metrics_exporter") -> fields("otel_metrics_exporter") ->
exporter_fields(metrics); exporter_fields(metrics);
fields("otel_logs_exporter") -> fields("otel_logs_exporter") ->
exporter_fields(logs); exporter_fields(logs);
fields("ssl_opts") -> fields("ssl_opts") ->
Schema = emqx_schema:client_ssl_opts_schema(#{}), Schema = emqx_schema:client_ssl_opts_schema(#{}),
lists:keydelete("enable", 1, Schema). lists:keydelete("enable", 1, Schema);
fields("otel_traces_exporter") ->
exporter_fields(traces);
fields("trace_filter") ->
%% More filters can be implemented in future, e.g. topic, clientid
[
{trace_all,
?HOCON(
boolean(),
#{
default => false,
desc => ?DESC(trace_all),
importance => ?IMPORTANCE_MEDIUM
}
)}
].
desc("opentelemetry") -> ?DESC(opentelemetry); desc("opentelemetry") -> ?DESC(opentelemetry);
desc("exporter") -> ?DESC(exporter); desc("exporter") -> ?DESC(exporter);
desc("otel_logs_exporter") -> ?DESC(exporter); desc("otel_logs_exporter") -> ?DESC(exporter);
desc("otel_metrics_exporter") -> ?DESC(exporter); desc("otel_metrics_exporter") -> ?DESC(exporter);
desc("otel_traces_exporter") -> ?DESC(exporter);
desc("otel_logs") -> ?DESC(otel_logs); desc("otel_logs") -> ?DESC(otel_logs);
desc("otel_metrics") -> ?DESC(otel_metrics); desc("otel_metrics") -> ?DESC(otel_metrics);
desc("otel_traces") -> ?DESC(otel_traces);
desc("ssl_opts") -> ?DESC(exporter_ssl); desc("ssl_opts") -> ?DESC(exporter_ssl);
desc("trace_filter") -> ?DESC(trace_filter);
desc(_) -> undefined. desc(_) -> undefined.
exporter_fields(OtelSignal) -> exporter_fields(OtelSignal) ->

View File

@ -0,0 +1,277 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_trace).
-behaviour(emqx_external_trace).
-export([
ensure_traces/1,
start/1,
stop/0
]).
-export([toggle_registered/1]).
-export([
trace_process_publish/3,
start_trace_send/2,
end_trace_send/1,
event/2
]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("opentelemetry_api/include/otel_tracer.hrl").
-define(EMQX_OTEL_CTX, otel_ctx).
%% NOTE: it's possible to use trace_flags to set is_sampled flag
-define(IS_ENABLED, emqx_enable).
-define(USER_PROPERTY, 'User-Property').
-define(TRACE_ALL_KEY, {?MODULE, trace_all}).
-define(TRACE_ALL, persistent_term:get(?TRACE_ALL_KEY, false)).
%%--------------------------------------------------------------------
%% config
%%--------------------------------------------------------------------
-spec toggle_registered(boolean()) -> ok | {error, term()}.
toggle_registered(true = _Enable) ->
emqx_external_trace:register_provider(?MODULE);
toggle_registered(false = _Enable) ->
_ = emqx_external_trace:unregister_provider(?MODULE),
ok.
-spec ensure_traces(map()) -> ok | {error, term()}.
ensure_traces(#{enable := true} = Conf) ->
start(Conf);
ensure_traces(_Conf) ->
ok.
-spec start(map()) -> ok | {error, term()}.
start(Conf) ->
_ = safe_stop_default_tracer(),
#{
exporter := Exporter,
max_queue_size := MaxQueueSize,
exporting_timeout := ExportingTimeout,
scheduled_delay := ScheduledDelay,
filter := #{trace_all := TraceAll}
} = Conf,
OtelEnv = [
{bsp_scheduled_delay_ms, ScheduledDelay},
{bsp_exporting_timeout_ms, ExportingTimeout},
{bsp_max_queue_size, MaxQueueSize},
{traces_exporter, emqx_otel_config:otel_exporter(Exporter)}
],
set_trace_all(TraceAll),
ok = application:set_env([{opentelemetry, OtelEnv}]),
_ = application:ensure_all_started(opentelemetry),
Res = assert_started(opentelemetry:start_default_tracer_provider()),
case Res of
ok ->
_ = toggle_registered(true),
Res;
Err ->
Err
end.
-spec stop() -> ok.
stop() ->
_ = toggle_registered(false),
safe_stop_default_tracer().
%%--------------------------------------------------------------------
%% trace API
%%--------------------------------------------------------------------
-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when
Packet :: emqx_types:packet(),
Channel :: emqx_channel:channel(),
Res :: term().
trace_process_publish(Packet, Channel, ProcessFun) ->
case maybe_init_ctx(Packet) of
false ->
ProcessFun(Packet, Channel);
RootCtx ->
RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true),
Attrs = maps:merge(packet_attributes(Packet), channel_attributes(Channel)),
SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{
attributes => Attrs
}),
Ctx = otel_tracer:set_current_span(RootCtx1, SpanCtx),
%% put ctx to packet, so it can be further propagated
Packet1 = put_ctx_to_packet(Ctx, Packet),
%% TODO: consider getting rid of propagating Ctx through process dict as it's anyway seems to have
%% very limited usage
otel_ctx:attach(Ctx),
try
ProcessFun(Packet1, Channel)
after
?end_span(),
clear()
end
end.
-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) ->
list(emqx_types:deliver()).
start_trace_send(Delivers, Channel) ->
lists:map(
fun({deliver, Topic, Msg} = Deliver) ->
case get_ctx_from_msg(Msg) of
Ctx when is_map(Ctx) ->
Attrs = maps:merge(
msg_attributes(Msg), sub_channel_attributes(Channel)
),
StartOpts = #{attributes => Attrs},
SpanCtx = otel_tracer:start_span(
Ctx, ?current_tracer, send_published_message, StartOpts
),
Msg1 = put_ctx_to_msg(
otel_tracer:set_current_span(Ctx, SpanCtx), Msg
),
{deliver, Topic, Msg1};
_ ->
Deliver
end
end,
Delivers
).
-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
end_trace_send(Packets) ->
lists:foreach(
fun(Packet) ->
case get_ctx_from_packet(Packet) of
Ctx when is_map(Ctx) ->
otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
_ ->
ok
end
end,
packets_list(Packets)
).
%% NOTE: adds an event only within an active span (Otel Ctx must be set in the calling process dict)
-spec event(opentelemetry:event_name(), opentelemetry:attributes_map()) -> ok.
event(Name, Attributes) ->
case otel_ctx:get_value(?IS_ENABLED, false) of
true ->
?add_event(Name, Attributes),
ok;
false ->
ok
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
packets_list(Packets) when is_list(Packets) ->
Packets;
packets_list(Packet) ->
[Packet].
maybe_init_ctx(#mqtt_packet{variable = Packet}) ->
case should_trace_packet(Packet) of
true ->
Ctx = extract_traceparent_from_packet(Packet),
should_trace_context(Ctx) andalso Ctx;
false ->
false
end.
extract_traceparent_from_packet(Packet) ->
Ctx = otel_ctx:new(),
case emqx_packet:info(properties, Packet) of
#{?USER_PROPERTY := UserProps} ->
otel_propagator_text_map:extract_to(Ctx, UserProps);
_ ->
Ctx
end.
should_trace_context(RootCtx) ->
map_size(RootCtx) > 0 orelse ?TRACE_ALL.
should_trace_packet(Packet) ->
not is_sys(emqx_packet:info(topic_name, Packet)).
%% TODO: move to emqx_topic module?
is_sys(<<"$SYS/", _/binary>> = _Topic) -> true;
is_sys(_Topic) -> false.
msg_attributes(Msg) ->
#{
'messaging.destination.name' => emqx_message:topic(Msg),
'messaging.client_id' => emqx_message:from(Msg)
}.
packet_attributes(#mqtt_packet{variable = Packet}) ->
#{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}.
channel_attributes(Channel) ->
#{'messaging.client_id' => emqx_channel:info(clientid, Channel)}.
sub_channel_attributes(Channel) ->
channel_attributes(Channel).
put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) ->
Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};
%% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it
put_ctx_to_msg(OtelCtx, Msg) when is_record(Msg, message) ->
Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}}.
put_ctx_to_packet(
OtelCtx, #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet
) ->
Extra = maps:get(internal_extra, Props, #{}),
Props1 = Props#{internal_extra => Extra#{?EMQX_OTEL_CTX => OtelCtx}},
Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}}.
get_ctx_from_msg(#message{extra = Extra}) ->
from_extra(Extra).
get_ctx_from_packet(#mqtt_packet{
variable = #mqtt_packet_publish{properties = #{internal_extra := Extra}}
}) ->
from_extra(Extra);
get_ctx_from_packet(_) ->
undefined.
from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) ->
OtelCtx;
from_extra(_) ->
undefined.
clear() ->
otel_ctx:clear().
safe_stop_default_tracer() ->
try
_ = opentelemetry:stop_default_tracer_provider()
catch
%% noramal scenario, opentelemetry supervisor is not started
exit:{noproc, _} -> ok
end,
ok.
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> {error, Reason}.
set_trace_all(TraceAll) ->
persistent_term:put({?MODULE, trace_all}, TraceAll).

View File

@ -36,8 +36,8 @@
payload :: emqx_types:payload(), payload :: emqx_types:payload(),
%% Timestamp (Unit: millisecond) %% Timestamp (Unit: millisecond)
timestamp :: integer(), timestamp :: integer(),
%% not used so far, for future extension %% Miscellaneous extensions, currently used for OpenTelemetry context propagation
extra = [] :: term() extra = #{} :: term()
}). }).
-endif. -endif.

View File

@ -0,0 +1 @@
Implemented Open Telemetry distributed tracing feature.

View File

@ -11,6 +11,9 @@ otel_logs.label: "Open Telemetry Logs"
otel_metrics.desc: "Open Telemetry Metrics configuration." otel_metrics.desc: "Open Telemetry Metrics configuration."
otel_metrics.label: "Open Telemetry Metrics" otel_metrics.label: "Open Telemetry Metrics"
otel_traces.desc: "Open Telemetry Traces configuration."
otel_traces.label: "Open Telemetry Traces"
enable.desc: "Enable or disable Open Telemetry signal." enable.desc: "Enable or disable Open Telemetry signal."
enable.label: "Enable." enable.label: "Enable."
@ -41,4 +44,13 @@ otel_log_handler_level.desc:
"""The log level of the Open Telemetry log handler.""" """The log level of the Open Telemetry log handler."""
otel_log_handler_level.label: "Log Level" otel_log_handler_level.label: "Log Level"
trace_filter.desc: "Open Telemetry Trace Filter configuration"
trace_filter.label: "Trace Filter"
trace_all.desc:
"""If enabled, all published messages are traced, a new trace ID is generated if it can't be extracted from the message.
Otherwise, only messages published with trace context are traced. Disabled by default."""
trace_all.label: "Trace All"
} }