diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 816ab7b2b..686d524dc 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -398,8 +398,10 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> case emqx_packet:check(Packet) of - ok -> process_publish(Packet, Channel); - {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) + ok -> + emqx_external_trace:trace_process_publish(Packet, Channel, fun process_publish/2); + {error, ReasonCode} -> + handle_out(disconnect, ReasonCode, Channel) end; handle_in( ?PUBACK_PACKET(PacketId, _ReasonCode, Properties), @@ -921,7 +923,11 @@ handle_deliver( Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), {ok, Channel#channel{session = NSession}}; -handle_deliver( +handle_deliver(Delivers, Channel) -> + Delivers1 = emqx_external_trace:start_trace_send(Delivers, Channel), + do_handle_deliver(Delivers1, Channel). + +do_handle_deliver( Delivers, Channel = #channel{ session = Session, diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 11d42f9dd..66160ed36 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -855,9 +855,14 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, State) when is_list(Packets) -> +handle_outgoing(Packets, State) -> + Res = do_handle_outgoing(Packets, State), + emqx_external_trace:end_trace_send(Packets), + Res. + +do_handle_outgoing(Packets, State) when is_list(Packets) -> send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); -handle_outgoing(Packet, State) -> +do_handle_outgoing(Packet, State) -> send((serialize_and_inc_stats_fun(State))(Packet), State). serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> diff --git a/apps/emqx/src/emqx_external_trace.erl b/apps/emqx/src/emqx_external_trace.erl new file mode 100644 index 000000000..7f8823903 --- /dev/null +++ b/apps/emqx/src/emqx_external_trace.erl @@ -0,0 +1,109 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_external_trace). + +-callback trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when + Packet :: emqx_types:packet(), + Channel :: emqx_channel:channel(), + Res :: term(). + +-callback start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> + list(emqx_types:deliver()). + +-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. + +-callback event(EventName :: term(), Attributes :: term()) -> ok. + +-export([ + register_provider/1, + unregister_provider/1, + trace_process_publish/3, + start_trace_send/2, + end_trace_send/1, + event/1, + event/2 +]). + +-define(PROVIDER, {?MODULE, trace_provider}). + +-define(with_provider(IfRegisitered, IfNotRegisired), + case persistent_term:get(?PROVIDER, undefined) of + undefined -> + IfNotRegisired; + Provider -> + Provider:IfRegisitered + end +). + +%%-------------------------------------------------------------------- +%% provider API +%%-------------------------------------------------------------------- + +-spec register_provider(module()) -> ok | {error, term()}. +register_provider(Module) when is_atom(Module) -> + case is_valid_provider(Module) of + true -> + persistent_term:put(?PROVIDER, Module); + false -> + {error, invalid_provider} + end. + +-spec unregister_provider(module()) -> ok | {error, term()}. +unregister_provider(Module) -> + case persistent_term:get(?PROVIDER, undefined) of + Module -> + persistent_term:erase(?PROVIDER), + ok; + _ -> + {error, not_registered} + end. + +%%-------------------------------------------------------------------- +%% trace API +%%-------------------------------------------------------------------- + +-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when + Packet :: emqx_types:packet(), + Channel :: emqx_channel:channel(), + Res :: term(). +trace_process_publish(Packet, Channel, ProcessFun) -> + ?with_provider(?FUNCTION_NAME(Packet, Channel, ProcessFun), ProcessFun(Packet, Channel)). + +-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> + list(emqx_types:deliver()). +start_trace_send(Delivers, Channel) -> + ?with_provider(?FUNCTION_NAME(Delivers, Channel), Delivers). + +-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. +end_trace_send(Packets) -> + ?with_provider(?FUNCTION_NAME(Packets), ok). + +event(Name) -> + event(Name, #{}). + +-spec event(term(), term()) -> ok. +event(Name, Attributes) -> + ?with_provider(?FUNCTION_NAME(Name, Attributes), ok). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +is_valid_provider(Module) -> + lists:all( + fun({F, A}) -> erlang:function_exported(Module, F, A) end, + ?MODULE:behaviour_info(callbacks) + ). diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 20be12c42..0799a24ee 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -960,6 +960,8 @@ serialize_properties(Props) when is_map(Props) -> serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined -> <<>>; +serialize_property(internal_extra, _) -> + <<>>; serialize_property('Payload-Format-Indicator', Val) -> <<16#01, Val>>; serialize_property('Message-Expiry-Interval', Val) -> diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index b65c8360f..6b684c199 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -311,7 +311,8 @@ to_packet( qos = QoS, headers = Headers, topic = Topic, - payload = Payload + payload = Payload, + extra = Extra } ) -> #mqtt_packet{ @@ -324,8 +325,8 @@ to_packet( variable = #mqtt_packet_publish{ topic_name = Topic, packet_id = PacketId, - properties = filter_pub_props( - maps:get(properties, Headers, #{}) + properties = maybe_put_extra( + Extra, filter_pub_props(maps:get(properties, Headers, #{})) ) }, payload = Payload @@ -345,6 +346,11 @@ filter_pub_props(Props) -> Props ). +maybe_put_extra(Extra, Props) when map_size(Extra) > 0 -> + Props#{internal_extra => Extra}; +maybe_put_extra(_Extra, Props) -> + Props. + %% @doc Message to map -spec to_map(emqx_types:message()) -> message_map(). to_map(#message{ diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 9cb23be2e..542dc8b3b 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -452,9 +452,15 @@ to_message( Headers ) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), + {Extra, Props1} = + case maps:take(internal_extra, Props) of + error -> {#{}, Props}; + ExtraProps -> ExtraProps + end, Msg#message{ flags = #{dup => Dup, retain => Retain}, - headers = Headers#{properties => Props} + headers = Headers#{properties => Props1}, + extra = Extra }. -spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message(). diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index f404ff15d..2e4164652 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -207,7 +207,7 @@ t_to_map(_) -> {topic, <<"topic">>}, {payload, <<"payload">>}, {timestamp, emqx_message:timestamp(Msg)}, - {extra, []} + {extra, #{}} ], ?assertEqual(List, emqx_message:to_list(Msg)), ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). @@ -223,7 +223,7 @@ t_from_map(_) -> topic => <<"topic">>, payload => <<"payload">>, timestamp => emqx_message:timestamp(Msg), - extra => [] + extra => #{} }, ?assertEqual(Map, emqx_message:to_map(Msg)), ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))). diff --git a/apps/emqx_opentelemetry/src/emqx_otel_app.erl b/apps/emqx_opentelemetry/src/emqx_otel_app.erl index cf93d7753..f4b579fe5 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_app.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_app.erl @@ -24,10 +24,12 @@ start(_StartType, _StartArgs) -> emqx_otel_config:add_handler(), ok = emqx_otel_config:add_otel_log_handler(), + ok = emqx_otel_trace:ensure_traces(emqx:get_config([opentelemetry, traces])), emqx_otel_sup:start_link(). stop(_State) -> emqx_otel_config:remove_handler(), + _ = emqx_otel_trace:stop(), _ = emqx_otel_config:remove_otel_log_handler(), ok. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_config.erl b/apps/emqx_opentelemetry/src/emqx_otel_config.erl index 11e97dcdd..d2cc81521 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_config.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_config.erl @@ -54,27 +54,24 @@ remove_handler() -> post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) -> ok; -post_config_update(?OPTL, _Req, New, _Old, AppEnvs) -> +post_config_update(?OPTL, _Req, New, Old, AppEnvs) -> application:set_env(AppEnvs), - MetricsRes = ensure_otel_metrics(New), - LogsRes = ensure_otel_logs(New), + MetricsRes = ensure_otel_metrics(New, Old), + LogsRes = ensure_otel_logs(New, Old), + TracesRes = ensure_otel_traces(New, Old), _ = maybe_stop_all_otel_apps(New), - case {MetricsRes, LogsRes} of - {ok, ok} -> ok; + case {MetricsRes, LogsRes, TracesRes} of + {ok, ok, ok} -> ok; Other -> {error, Other} end; post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> ok. stop_all_otel_apps() -> - _ = application:stop(opentelemetry), - _ = application:stop(opentelemetry_experimental), - _ = application:stop(opentelemetry_experimental_api), - _ = application:stop(opentelemetry_exporter), - ok. + stop_all_otel_apps(true). add_otel_log_handler() -> - ensure_otel_logs(emqx:get_config(?OPTL)). + ensure_otel_logs(emqx:get_config(?OPTL), #{}). remove_otel_log_handler() -> remove_handler_if_present(?OTEL_LOG_HANDLER_ID). @@ -93,23 +90,34 @@ otel_exporter(ExporterConf) -> %% Internal functions -ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) -> +ensure_otel_metrics(#{metrics := MetricsConf}, #{metrics := MetricsConf}) -> + ok; +ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}, _Old) -> _ = emqx_otel_metrics:stop_otel(), emqx_otel_metrics:start_otel(MetricsConf); -ensure_otel_metrics(#{metrics := #{enable := false}}) -> +ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) -> emqx_otel_metrics:stop_otel(); -ensure_otel_metrics(_) -> +ensure_otel_metrics(_, _) -> ok. -ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) -> +ensure_otel_logs(#{logs := LogsConf}, #{logs := LogsConf}) -> + ok; +ensure_otel_logs(#{logs := #{enable := true} = LogsConf}, _OldConf) -> ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID), ok = ensure_log_apps(), HandlerConf = tr_handler_conf(LogsConf), %% NOTE: should primary logger level be updated if it's higher than otel log level? logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf); -ensure_otel_logs(#{logs := #{enable := false}}) -> +ensure_otel_logs(#{logs := #{enable := false}}, _OldConf) -> remove_handler_if_present(?OTEL_LOG_HANDLER_ID). +ensure_otel_traces(#{traces := TracesConf}, #{traces := TracesConf}) -> + ok; +ensure_otel_traces(#{traces := #{enable := true} = TracesConf}, _OldConf) -> + emqx_otel_trace:start(TracesConf); +ensure_otel_traces(#{traces := #{enable := false}}, _OldConf) -> + emqx_otel_trace:stop(). + remove_handler_if_present(HandlerId) -> case logger:get_handler_config(HandlerId) of {ok, _} -> @@ -123,8 +131,13 @@ ensure_log_apps() -> {ok, _} = application:ensure_all_started(opentelemetry_experimental), ok. -maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) -> - stop_all_otel_apps(); +maybe_stop_all_otel_apps(#{ + metrics := #{enable := false}, + logs := #{enable := false}, + traces := #{enable := false} +}) -> + IsShutdown = false, + stop_all_otel_apps(IsShutdown); maybe_stop_all_otel_apps(_) -> ok. @@ -158,3 +171,18 @@ is_ssl(<<"https://", _/binary>> = _Endpoint) -> true; is_ssl(_Endpoint) -> false. + +stop_all_otel_apps(IsShutdown) -> + %% if traces were enabled, it's not safe to stop opentelemetry app, + %% as there could be not finsihed traces that would crash if spans ETS tables are deleted + _ = + case IsShutdown of + true -> + _ = application:stop(opentelemetry); + false -> + ok + end, + _ = application:stop(opentelemetry_experimental), + _ = application:stop(opentelemetry_experimental_api), + _ = application:stop(opentelemetry_exporter), + ok. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_schema.erl b/apps/emqx_opentelemetry/src/emqx_otel_schema.erl index 927bc9dfd..6359f88a5 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_schema.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_schema.erl @@ -62,6 +62,13 @@ fields("opentelemetry") -> #{ desc => ?DESC(otel_logs) } + )}, + {traces, + ?HOCON( + ?R_REF("otel_traces"), + #{ + desc => ?DESC(otel_traces) + } )} ]; fields("otel_metrics") -> @@ -137,21 +144,94 @@ fields("otel_logs") -> } )} ]; +fields("otel_traces") -> + [ + {enable, + ?HOCON( + boolean(), + #{ + default => false, + desc => ?DESC(enable), + importance => ?IMPORTANCE_HIGH + } + )}, + {max_queue_size, + ?HOCON( + pos_integer(), + #{ + default => 2048, + desc => ?DESC(max_queue_size), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {exporting_timeout, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"30s">>, + desc => ?DESC(exporting_timeout), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {scheduled_delay, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(scheduled_delay), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {exporter, + ?HOCON( + ?R_REF("otel_traces_exporter"), + #{ + desc => ?DESC(exporter), + importance => ?IMPORTANCE_HIGH + } + )}, + {filter, + ?HOCON( + ?R_REF("trace_filter"), + #{ + desc => ?DESC(trace_filter), + importance => ?IMPORTANCE_MEDIUM + } + )} + ]; fields("otel_metrics_exporter") -> exporter_fields(metrics); fields("otel_logs_exporter") -> exporter_fields(logs); fields("ssl_opts") -> Schema = emqx_schema:client_ssl_opts_schema(#{}), - lists:keydelete("enable", 1, Schema). + lists:keydelete("enable", 1, Schema); +fields("otel_traces_exporter") -> + exporter_fields(traces); +fields("trace_filter") -> + %% More filters can be implemented in future, e.g. topic, clientid + [ + {trace_all, + ?HOCON( + boolean(), + #{ + default => false, + desc => ?DESC(trace_all), + importance => ?IMPORTANCE_MEDIUM + } + )} + ]. desc("opentelemetry") -> ?DESC(opentelemetry); desc("exporter") -> ?DESC(exporter); desc("otel_logs_exporter") -> ?DESC(exporter); desc("otel_metrics_exporter") -> ?DESC(exporter); +desc("otel_traces_exporter") -> ?DESC(exporter); desc("otel_logs") -> ?DESC(otel_logs); desc("otel_metrics") -> ?DESC(otel_metrics); +desc("otel_traces") -> ?DESC(otel_traces); desc("ssl_opts") -> ?DESC(exporter_ssl); +desc("trace_filter") -> ?DESC(trace_filter); desc(_) -> undefined. exporter_fields(OtelSignal) -> diff --git a/apps/emqx_opentelemetry/src/emqx_otel_trace.erl b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl new file mode 100644 index 000000000..0c78f0abd --- /dev/null +++ b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl @@ -0,0 +1,277 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_trace). + +-behaviour(emqx_external_trace). + +-export([ + ensure_traces/1, + start/1, + stop/0 +]). + +-export([toggle_registered/1]). + +-export([ + trace_process_publish/3, + start_trace_send/2, + end_trace_send/1, + event/2 +]). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("opentelemetry_api/include/otel_tracer.hrl"). + +-define(EMQX_OTEL_CTX, otel_ctx). +%% NOTE: it's possible to use trace_flags to set is_sampled flag +-define(IS_ENABLED, emqx_enable). +-define(USER_PROPERTY, 'User-Property'). + +-define(TRACE_ALL_KEY, {?MODULE, trace_all}). +-define(TRACE_ALL, persistent_term:get(?TRACE_ALL_KEY, false)). + +%%-------------------------------------------------------------------- +%% config +%%-------------------------------------------------------------------- + +-spec toggle_registered(boolean()) -> ok | {error, term()}. +toggle_registered(true = _Enable) -> + emqx_external_trace:register_provider(?MODULE); +toggle_registered(false = _Enable) -> + _ = emqx_external_trace:unregister_provider(?MODULE), + ok. + +-spec ensure_traces(map()) -> ok | {error, term()}. +ensure_traces(#{enable := true} = Conf) -> + start(Conf); +ensure_traces(_Conf) -> + ok. + +-spec start(map()) -> ok | {error, term()}. +start(Conf) -> + _ = safe_stop_default_tracer(), + #{ + exporter := Exporter, + max_queue_size := MaxQueueSize, + exporting_timeout := ExportingTimeout, + scheduled_delay := ScheduledDelay, + filter := #{trace_all := TraceAll} + } = Conf, + OtelEnv = [ + {bsp_scheduled_delay_ms, ScheduledDelay}, + {bsp_exporting_timeout_ms, ExportingTimeout}, + {bsp_max_queue_size, MaxQueueSize}, + {traces_exporter, emqx_otel_config:otel_exporter(Exporter)} + ], + set_trace_all(TraceAll), + ok = application:set_env([{opentelemetry, OtelEnv}]), + _ = application:ensure_all_started(opentelemetry), + Res = assert_started(opentelemetry:start_default_tracer_provider()), + case Res of + ok -> + _ = toggle_registered(true), + Res; + Err -> + Err + end. + +-spec stop() -> ok. +stop() -> + _ = toggle_registered(false), + safe_stop_default_tracer(). + +%%-------------------------------------------------------------------- +%% trace API +%%-------------------------------------------------------------------- + +-spec trace_process_publish(Packet, Channel, fun((Packet, Channel) -> Res)) -> Res when + Packet :: emqx_types:packet(), + Channel :: emqx_channel:channel(), + Res :: term(). +trace_process_publish(Packet, Channel, ProcessFun) -> + case maybe_init_ctx(Packet) of + false -> + ProcessFun(Packet, Channel); + RootCtx -> + RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true), + Attrs = maps:merge(packet_attributes(Packet), channel_attributes(Channel)), + SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{ + attributes => Attrs + }), + Ctx = otel_tracer:set_current_span(RootCtx1, SpanCtx), + %% put ctx to packet, so it can be further propagated + Packet1 = put_ctx_to_packet(Ctx, Packet), + %% TODO: consider getting rid of propagating Ctx through process dict as it's anyway seems to have + %% very limited usage + otel_ctx:attach(Ctx), + try + ProcessFun(Packet1, Channel) + after + ?end_span(), + clear() + end + end. + +-spec start_trace_send(list(emqx_types:deliver()), emqx_channel:channel()) -> + list(emqx_types:deliver()). +start_trace_send(Delivers, Channel) -> + lists:map( + fun({deliver, Topic, Msg} = Deliver) -> + case get_ctx_from_msg(Msg) of + Ctx when is_map(Ctx) -> + Attrs = maps:merge( + msg_attributes(Msg), sub_channel_attributes(Channel) + ), + StartOpts = #{attributes => Attrs}, + SpanCtx = otel_tracer:start_span( + Ctx, ?current_tracer, send_published_message, StartOpts + ), + Msg1 = put_ctx_to_msg( + otel_tracer:set_current_span(Ctx, SpanCtx), Msg + ), + {deliver, Topic, Msg1}; + _ -> + Deliver + end + end, + Delivers + ). + +-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. +end_trace_send(Packets) -> + lists:foreach( + fun(Packet) -> + case get_ctx_from_packet(Packet) of + Ctx when is_map(Ctx) -> + otel_span:end_span(otel_tracer:current_span_ctx(Ctx)); + _ -> + ok + end + end, + packets_list(Packets) + ). + +%% NOTE: adds an event only within an active span (Otel Ctx must be set in the calling process dict) +-spec event(opentelemetry:event_name(), opentelemetry:attributes_map()) -> ok. +event(Name, Attributes) -> + case otel_ctx:get_value(?IS_ENABLED, false) of + true -> + ?add_event(Name, Attributes), + ok; + false -> + ok + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +packets_list(Packets) when is_list(Packets) -> + Packets; +packets_list(Packet) -> + [Packet]. + +maybe_init_ctx(#mqtt_packet{variable = Packet}) -> + case should_trace_packet(Packet) of + true -> + Ctx = extract_traceparent_from_packet(Packet), + should_trace_context(Ctx) andalso Ctx; + false -> + false + end. + +extract_traceparent_from_packet(Packet) -> + Ctx = otel_ctx:new(), + case emqx_packet:info(properties, Packet) of + #{?USER_PROPERTY := UserProps} -> + otel_propagator_text_map:extract_to(Ctx, UserProps); + _ -> + Ctx + end. + +should_trace_context(RootCtx) -> + map_size(RootCtx) > 0 orelse ?TRACE_ALL. + +should_trace_packet(Packet) -> + not is_sys(emqx_packet:info(topic_name, Packet)). + +%% TODO: move to emqx_topic module? +is_sys(<<"$SYS/", _/binary>> = _Topic) -> true; +is_sys(_Topic) -> false. + +msg_attributes(Msg) -> + #{ + 'messaging.destination.name' => emqx_message:topic(Msg), + 'messaging.client_id' => emqx_message:from(Msg) + }. + +packet_attributes(#mqtt_packet{variable = Packet}) -> + #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}. + +channel_attributes(Channel) -> + #{'messaging.client_id' => emqx_channel:info(clientid, Channel)}. + +sub_channel_attributes(Channel) -> + channel_attributes(Channel). + +put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) -> + Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}}; +%% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it +put_ctx_to_msg(OtelCtx, Msg) when is_record(Msg, message) -> + Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}}. + +put_ctx_to_packet( + OtelCtx, #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet +) -> + Extra = maps:get(internal_extra, Props, #{}), + Props1 = Props#{internal_extra => Extra#{?EMQX_OTEL_CTX => OtelCtx}}, + Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}}. + +get_ctx_from_msg(#message{extra = Extra}) -> + from_extra(Extra). + +get_ctx_from_packet(#mqtt_packet{ + variable = #mqtt_packet_publish{properties = #{internal_extra := Extra}} +}) -> + from_extra(Extra); +get_ctx_from_packet(_) -> + undefined. + +from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) -> + OtelCtx; +from_extra(_) -> + undefined. + +clear() -> + otel_ctx:clear(). + +safe_stop_default_tracer() -> + try + _ = opentelemetry:stop_default_tracer_provider() + catch + %% noramal scenario, opentelemetry supervisor is not started + exit:{noproc, _} -> ok + end, + ok. + +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; +assert_started({error, Reason}) -> {error, Reason}. + +set_trace_all(TraceAll) -> + persistent_term:put({?MODULE, trace_all}, TraceAll). diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index a0d196fa9..cbb452c41 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -36,8 +36,8 @@ payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer(), - %% not used so far, for future extension - extra = [] :: term() + %% Miscellaneous extensions, currently used for OpenTelemetry context propagation + extra = #{} :: term() }). -endif. diff --git a/changes/ce/feat-11984.en.md b/changes/ce/feat-11984.en.md new file mode 100644 index 000000000..e4e0a7717 --- /dev/null +++ b/changes/ce/feat-11984.en.md @@ -0,0 +1 @@ +Implemented Open Telemetry distributed tracing feature. diff --git a/rel/i18n/emqx_otel_schema.hocon b/rel/i18n/emqx_otel_schema.hocon index 9e59b2a76..0a41874b9 100644 --- a/rel/i18n/emqx_otel_schema.hocon +++ b/rel/i18n/emqx_otel_schema.hocon @@ -11,6 +11,9 @@ otel_logs.label: "Open Telemetry Logs" otel_metrics.desc: "Open Telemetry Metrics configuration." otel_metrics.label: "Open Telemetry Metrics" +otel_traces.desc: "Open Telemetry Traces configuration." +otel_traces.label: "Open Telemetry Traces" + enable.desc: "Enable or disable Open Telemetry signal." enable.label: "Enable." @@ -41,4 +44,13 @@ otel_log_handler_level.desc: """The log level of the Open Telemetry log handler.""" otel_log_handler_level.label: "Log Level" +trace_filter.desc: "Open Telemetry Trace Filter configuration" +trace_filter.label: "Trace Filter" + +trace_all.desc: +"""If enabled, all published messages are traced, a new trace ID is generated if it can't be extracted from the message. +Otherwise, only messages published with trace context are traced. Disabled by default.""" +trace_all.label: "Trace All" + + }