diff --git a/.ci/docker-compose-file/docker-compose-otel.yaml b/.ci/docker-compose-file/docker-compose-otel.yaml new file mode 100644 index 000000000..a2d1bfae7 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-otel.yaml @@ -0,0 +1,69 @@ +version: '3.9' + +services: + jaeger-all-in-one: + image: jaegertracing/all-in-one:1.51.0 + container_name: jaeger.emqx.net + hostname: jaeger.emqx.net + networks: + - emqx_bridge + restart: always +# ports: +# - "16686:16686" + user: "${DOCKER_USER:-root}" + + # Collector + otel-collector: + image: otel/opentelemetry-collector:0.90.0 + container_name: otel-collector.emqx.net + hostname: otel-collector.emqx.net + networks: + - emqx_bridge + restart: always + command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] + volumes: + - ./otel:/etc/ +# ports: +# - "1888:1888" # pprof extension +# - "8888:8888" # Prometheus metrics exposed by the collector +# - "8889:8889" # Prometheus exporter metrics +# - "13133:13133" # health_check extension +# - "4317:4317" # OTLP gRPC receiver +# - "4318:4318" # OTLP http receiver +# - "55679:55679" # zpages extension + depends_on: + - jaeger-all-in-one + user: "${DOCKER_USER:-root}" + + +# Collector + otel-collector-tls: + image: otel/opentelemetry-collector:0.90.0 + container_name: otel-collector-tls.emqx.net + hostname: otel-collector-tls.emqx.net + networks: + - emqx_bridge + restart: always + command: ["--config=/etc/otel-collector-config-tls.yaml", "${OTELCOL_ARGS}"] + volumes: + - ./otel:/etc/ + - ./certs:/etc/certs + # ports: + # - "14317:4317" # OTLP gRPC receiver + depends_on: + - jaeger-all-in-one + user: "${DOCKER_USER:-root}" + +#networks: +# emqx_bridge: +# driver: bridge +# name: emqx_bridge +# enable_ipv6: true +# ipam: +# driver: default +# config: +# - subnet: 172.100.239.0/24 +# gateway: 172.100.239.1 +# - subnet: 2001:3200:3200::/64 +# gateway: 2001:3200:3200::1 +# diff --git a/.ci/docker-compose-file/otel/.gitignore b/.ci/docker-compose-file/otel/.gitignore new file mode 100644 index 000000000..98dacbd74 --- /dev/null +++ b/.ci/docker-compose-file/otel/.gitignore @@ -0,0 +1,6 @@ +certs +hostname +hosts +otel-collector.json +otel-collector-tls.json +resolv.conf diff --git a/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml b/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml new file mode 100644 index 000000000..9163fc724 --- /dev/null +++ b/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml @@ -0,0 +1,52 @@ +receivers: + otlp: + protocols: + grpc: + tls: + ca_file: /etc/certs/ca.crt + cert_file: /etc/certs/server.crt + key_file: /etc/certs/server.key + http: + tls: + ca_file: /etc/certs/ca.crt + cert_file: /etc/certs/server.crt + key_file: /etc/certs/server.key + +exporters: + logging: + verbosity: detailed + otlp: + endpoint: jaeger.emqx.net:4317 + tls: + insecure: true + debug: + verbosity: detailed + file: + path: /etc/otel-collector-tls.json + + +processors: + batch: + # send data immediately + timeout: 0 + +extensions: + health_check: + zpages: + endpoint: :55679 + +service: + extensions: [zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file] diff --git a/.ci/docker-compose-file/otel/otel-collector-config.yaml b/.ci/docker-compose-file/otel/otel-collector-config.yaml new file mode 100644 index 000000000..6d6650139 --- /dev/null +++ b/.ci/docker-compose-file/otel/otel-collector-config.yaml @@ -0,0 +1,51 @@ +receivers: + otlp: + protocols: + grpc: + tls: +# ca_file: /etc/ca.pem +# cert_file: /etc/server.pem +# key_file: /etc/server.key + http: + tls: +# ca_file: /etc/ca.pem +# cert_file: /etc/server.pem +# key_file: /etc/server.key + +exporters: + logging: + verbosity: detailed + otlp: + endpoint: jaeger.emqx.net:4317 + tls: + insecure: true + debug: + verbosity: detailed + file: + path: /etc/otel-collector.json + +processors: + batch: + # send data immediately + timeout: 0 + +extensions: + health_check: + zpages: + endpoint: :55679 + +service: + extensions: [zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file] diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0d0e57c7e..5069076e5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -398,8 +398,15 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> case emqx_packet:check(Packet) of - ok -> process_publish(Packet, Channel); - {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) + ok -> + emqx_external_trace:trace_process_publish( + Packet, + %% More info can be added in future, but for now only clientid is used + trace_info(Channel), + fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end + ); + {error, ReasonCode} -> + handle_out(disconnect, ReasonCode, Channel) end; handle_in( ?PUBACK_PACKET(PacketId, _ReasonCode, Properties), @@ -921,7 +928,11 @@ handle_deliver( Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), {ok, Channel#channel{session = NSession}}; -handle_deliver( +handle_deliver(Delivers, Channel) -> + Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)), + do_handle_deliver(Delivers1, Channel). + +do_handle_deliver( Delivers, Channel = #channel{ session = Session, @@ -1429,6 +1440,10 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) -> emqx_olp:backoff(Zone), ok. +trace_info(Channel) -> + %% More info can be added in future, but for now only clientid is used + maps:from_list(info([clientid], Channel)). + %%-------------------------------------------------------------------- %% Enrich MQTT Connect Info diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 11d42f9dd..66160ed36 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -855,9 +855,14 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, State) when is_list(Packets) -> +handle_outgoing(Packets, State) -> + Res = do_handle_outgoing(Packets, State), + emqx_external_trace:end_trace_send(Packets), + Res. + +do_handle_outgoing(Packets, State) when is_list(Packets) -> send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); -handle_outgoing(Packet, State) -> +do_handle_outgoing(Packet, State) -> send((serialize_and_inc_stats_fun(State))(Packet), State). serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> diff --git a/apps/emqx/src/emqx_external_trace.erl b/apps/emqx/src/emqx_external_trace.erl new file mode 100644 index 000000000..1a7df93d0 --- /dev/null +++ b/apps/emqx/src/emqx_external_trace.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_external_trace). + +-callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when + Packet :: emqx_types:packet(), + ChannelInfo :: channel_info(), + Res :: term(). + +-callback start_trace_send(list(emqx_types:deliver()), channel_info()) -> + list(emqx_types:deliver()). + +-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. + +-callback event(EventName :: term(), Attributes :: term()) -> ok. + +-type channel_info() :: #{atom() => _}. + +-export([ + provider/0, + register_provider/1, + unregister_provider/1, + trace_process_publish/3, + start_trace_send/2, + end_trace_send/1, + event/1, + event/2 +]). + +-export_type([channel_info/0]). + +-define(PROVIDER, {?MODULE, trace_provider}). + +-define(with_provider(IfRegistered, IfNotRegistered), + case persistent_term:get(?PROVIDER, undefined) of + undefined -> + IfNotRegistered; + Provider -> + Provider:IfRegistered + end +). + +%%-------------------------------------------------------------------- +%% provider API +%%-------------------------------------------------------------------- + +-spec register_provider(module()) -> ok | {error, term()}. +register_provider(Module) when is_atom(Module) -> + case is_valid_provider(Module) of + true -> + persistent_term:put(?PROVIDER, Module); + false -> + {error, invalid_provider} + end. + +-spec unregister_provider(module()) -> ok | {error, term()}. +unregister_provider(Module) -> + case persistent_term:get(?PROVIDER, undefined) of + Module -> + persistent_term:erase(?PROVIDER), + ok; + _ -> + {error, not_registered} + end. + +-spec provider() -> module() | undefined. +provider() -> + persistent_term:get(?PROVIDER, undefined). +%%-------------------------------------------------------------------- +%% trace API +%%-------------------------------------------------------------------- + +-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when + Packet :: emqx_types:packet(), + ChannelInfo :: channel_info(), + Res :: term(). +trace_process_publish(Packet, ChannelInfo, ProcessFun) -> + ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)). + +-spec start_trace_send(list(emqx_types:deliver()), channel_info()) -> + list(emqx_types:deliver()). +start_trace_send(Delivers, ChannelInfo) -> + ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers). + +-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. +end_trace_send(Packets) -> + ?with_provider(?FUNCTION_NAME(Packets), ok). + +event(Name) -> + event(Name, #{}). + +-spec event(term(), term()) -> ok. +event(Name, Attributes) -> + ?with_provider(?FUNCTION_NAME(Name, Attributes), ok). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +is_valid_provider(Module) -> + lists:all( + fun({F, A}) -> erlang:function_exported(Module, F, A) end, + ?MODULE:behaviour_info(callbacks) + ). diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 20be12c42..0799a24ee 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -960,6 +960,8 @@ serialize_properties(Props) when is_map(Props) -> serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined -> <<>>; +serialize_property(internal_extra, _) -> + <<>>; serialize_property('Payload-Format-Indicator', Val) -> <<16#01, Val>>; serialize_property('Message-Expiry-Interval', Val) -> diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index b65c8360f..6b684c199 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -311,7 +311,8 @@ to_packet( qos = QoS, headers = Headers, topic = Topic, - payload = Payload + payload = Payload, + extra = Extra } ) -> #mqtt_packet{ @@ -324,8 +325,8 @@ to_packet( variable = #mqtt_packet_publish{ topic_name = Topic, packet_id = PacketId, - properties = filter_pub_props( - maps:get(properties, Headers, #{}) + properties = maybe_put_extra( + Extra, filter_pub_props(maps:get(properties, Headers, #{})) ) }, payload = Payload @@ -345,6 +346,11 @@ filter_pub_props(Props) -> Props ). +maybe_put_extra(Extra, Props) when map_size(Extra) > 0 -> + Props#{internal_extra => Extra}; +maybe_put_extra(_Extra, Props) -> + Props. + %% @doc Message to map -spec to_map(emqx_types:message()) -> message_map(). to_map(#message{ diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 9cb23be2e..542dc8b3b 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -452,9 +452,15 @@ to_message( Headers ) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), + {Extra, Props1} = + case maps:take(internal_extra, Props) of + error -> {#{}, Props}; + ExtraProps -> ExtraProps + end, Msg#message{ flags = #{dup => Dup, retain => Retain}, - headers = Headers#{properties => Props} + headers = Headers#{properties => Props1}, + extra = Extra }. -spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message(). diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index f404ff15d..2e4164652 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -207,7 +207,7 @@ t_to_map(_) -> {topic, <<"topic">>}, {payload, <<"payload">>}, {timestamp, emqx_message:timestamp(Msg)}, - {extra, []} + {extra, #{}} ], ?assertEqual(List, emqx_message:to_list(Msg)), ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). @@ -223,7 +223,7 @@ t_from_map(_) -> topic => <<"topic">>, payload => <<"payload">>, timestamp => emqx_message:timestamp(Msg), - extra => [] + extra => #{} }, ?assertEqual(Map, emqx_message:to_map(Msg)), ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 27f984f51..6dffd5e2d 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -25,12 +25,7 @@ redbug, xmerl, {hocon, load}, - telemetry, - {opentelemetry, load}, - {opentelemetry_api, load}, - {opentelemetry_experimental, load}, - {opentelemetry_api_experimental, load}, - {opentelemetry_exporter, load} + telemetry ], %% must always be of type `load' common_business_apps => diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 36635d50e..8dc385fb3 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -50,6 +50,7 @@ start() -> start_sysmon(), configure_shard_transports(), set_mnesia_extra_diagnostic_checks(), + emqx_otel_app:configure_otel_deps(), ekka:start(), ok. diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 7abac0862..afb195543 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -69,9 +69,7 @@ stop_apps() -> ?SLOG(notice, #{msg => "stopping_emqx_apps"}), _ = emqx_alarm_handler:unload(), ok = emqx_conf_app:unset_config_loaded(), - lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())), - %% Mute otel deps application. - ok = emqx_otel_app:stop_deps(). + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). %% Those port apps are terminated after the main apps %% Don't need to stop when reboot. diff --git a/apps/emqx_opentelemetry/docker-ct b/apps/emqx_opentelemetry/docker-ct new file mode 100644 index 000000000..8f7569d06 --- /dev/null +++ b/apps/emqx_opentelemetry/docker-ct @@ -0,0 +1 @@ +otel diff --git a/apps/emqx_opentelemetry/rebar.config b/apps/emqx_opentelemetry/rebar.config index 7086a2f29..a14c5922e 100644 --- a/apps/emqx_opentelemetry/rebar.config +++ b/apps/emqx_opentelemetry/rebar.config @@ -1,8 +1,16 @@ %% -*- mode: erlang -*- -{deps, [ - {emqx, {path, "../emqx"}} -]}. +{deps, + [{emqx, {path, "../emqx"}} + %% trace + , {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_api"}} + , {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry"}} + %% logs, metrics + , {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_experimental"}} + , {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_api_experimental"}} + %% export + , {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_exporter"}} + ]}. {edoc_opts, [{preprocess, true}]}. {erl_opts, [ diff --git a/apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src b/apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src index 134ef2a44..695ba7ae9 100644 --- a/apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src +++ b/apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src @@ -8,7 +8,12 @@ stdlib, emqx, %% otel metrics depend on emqx_mgmt_cache - emqx_management + emqx_management, + opentelemetry_exporter, + opentelemetry, + opentelemetry_experimental, + opentelemetry_api, + opentelemetry_api_experimental ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_opentelemetry/src/emqx_otel_api.erl b/apps/emqx_opentelemetry/src/emqx_otel_api.erl index d8c76ebcf..f14bdb00b 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_api.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_api.erl @@ -103,24 +103,19 @@ otel_config_schema() -> otel_config_example() -> #{ + exporter => #{ + endpoint => "http://localhost:4317", + ssl_options => #{} + }, logs => #{ enable => true, - exporter => #{ - endpoint => "http://localhost:4317", - ssl_options => #{ - enable => false - } - }, level => warning }, metrics => #{ + enable => true + }, + traces => #{ enable => true, - exporter => #{ - endpoint => "http://localhost:4317", - interval => "10s", - ssl_options => #{ - enable => false - } - } + filter => #{trace_all => false} } }. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_app.erl b/apps/emqx_opentelemetry/src/emqx_otel_app.erl index cf93d7753..014785c6d 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_app.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_app.erl @@ -19,17 +19,26 @@ -behaviour(application). -export([start/2, stop/1]). --export([stop_deps/0]). +-export([configure_otel_deps/0]). start(_StartType, _StartArgs) -> emqx_otel_config:add_handler(), ok = emqx_otel_config:add_otel_log_handler(), + ok = emqx_otel_trace:ensure_traces(emqx:get_config([opentelemetry])), emqx_otel_sup:start_link(). stop(_State) -> emqx_otel_config:remove_handler(), + _ = emqx_otel_trace:stop(), _ = emqx_otel_config:remove_otel_log_handler(), ok. -stop_deps() -> - emqx_otel_config:stop_all_otel_apps(). +configure_otel_deps() -> + %% default tracer and metrics are started only on demand + ok = application:set_env( + [ + {opentelemetry, [{start_default_tracer, false}]}, + {opentelemetry_experimental, [{start_default_metrics, false}]} + ], + [{persistent, true}] + ). diff --git a/apps/emqx_opentelemetry/src/emqx_otel_config.erl b/apps/emqx_opentelemetry/src/emqx_otel_config.erl index 11e97dcdd..0d2f9988b 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_config.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_config.erl @@ -27,7 +27,6 @@ -export([post_config_update/5]). -export([update/1]). -export([add_otel_log_handler/0, remove_otel_log_handler/0]). --export([stop_all_otel_apps/0]). -export([otel_exporter/1]). update(Config) -> @@ -54,27 +53,20 @@ remove_handler() -> post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) -> ok; -post_config_update(?OPTL, _Req, New, _Old, AppEnvs) -> +post_config_update(?OPTL, _Req, New, Old, AppEnvs) -> application:set_env(AppEnvs), - MetricsRes = ensure_otel_metrics(New), - LogsRes = ensure_otel_logs(New), - _ = maybe_stop_all_otel_apps(New), - case {MetricsRes, LogsRes} of - {ok, ok} -> ok; + MetricsRes = ensure_otel_metrics(New, Old), + LogsRes = ensure_otel_logs(New, Old), + TracesRes = ensure_otel_traces(New, Old), + case {MetricsRes, LogsRes, TracesRes} of + {ok, ok, ok} -> ok; Other -> {error, Other} end; post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> ok. -stop_all_otel_apps() -> - _ = application:stop(opentelemetry), - _ = application:stop(opentelemetry_experimental), - _ = application:stop(opentelemetry_experimental_api), - _ = application:stop(opentelemetry_exporter), - ok. - add_otel_log_handler() -> - ensure_otel_logs(emqx:get_config(?OPTL)). + ensure_otel_logs(emqx:get_config(?OPTL), #{}). remove_otel_log_handler() -> remove_handler_if_present(?OTEL_LOG_HANDLER_ID). @@ -93,23 +85,43 @@ otel_exporter(ExporterConf) -> %% Internal functions -ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) -> +ensure_otel_metrics( + #{metrics := MetricsConf, exporter := Exporter}, + #{metrics := MetricsConf, exporter := Exporter} +) -> + ok; +ensure_otel_metrics(#{metrics := #{enable := true}} = Conf, _Old) -> _ = emqx_otel_metrics:stop_otel(), - emqx_otel_metrics:start_otel(MetricsConf); -ensure_otel_metrics(#{metrics := #{enable := false}}) -> + emqx_otel_metrics:start_otel(Conf); +ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) -> emqx_otel_metrics:stop_otel(); -ensure_otel_metrics(_) -> +ensure_otel_metrics(_, _) -> ok. -ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) -> +ensure_otel_logs( + #{logs := LogsConf, exporter := Exporter}, + #{logs := LogsConf, exporter := Exporter} +) -> + ok; +ensure_otel_logs(#{logs := #{enable := true}} = Conf, _OldConf) -> ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID), - ok = ensure_log_apps(), - HandlerConf = tr_handler_conf(LogsConf), + HandlerConf = tr_handler_conf(Conf), %% NOTE: should primary logger level be updated if it's higher than otel log level? logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf); -ensure_otel_logs(#{logs := #{enable := false}}) -> +ensure_otel_logs(#{logs := #{enable := false}}, _OldConf) -> remove_handler_if_present(?OTEL_LOG_HANDLER_ID). +ensure_otel_traces( + #{traces := TracesConf, exporter := Exporter}, + #{traces := TracesConf, exporter := Exporter} +) -> + ok; +ensure_otel_traces(#{traces := #{enable := true}} = Conf, _OldConf) -> + _ = emqx_otel_trace:stop(), + emqx_otel_trace:start(Conf); +ensure_otel_traces(#{traces := #{enable := false}}, _OldConf) -> + emqx_otel_trace:stop(). + remove_handler_if_present(HandlerId) -> case logger:get_handler_config(HandlerId) of {ok, _} -> @@ -118,24 +130,13 @@ remove_handler_if_present(HandlerId) -> ok end. -ensure_log_apps() -> - {ok, _} = application:ensure_all_started(opentelemetry_exporter), - {ok, _} = application:ensure_all_started(opentelemetry_experimental), - ok. - -maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) -> - stop_all_otel_apps(); -maybe_stop_all_otel_apps(_) -> - ok. - -tr_handler_conf(Conf) -> +tr_handler_conf(#{logs := LogsConf, exporter := ExporterConf}) -> #{ level := Level, max_queue_size := MaxQueueSize, exporting_timeout := ExportingTimeout, - scheduled_delay := ScheduledDelay, - exporter := ExporterConf - } = Conf, + scheduled_delay := ScheduledDelay + } = LogsConf, #{ level => Level, config => #{ diff --git a/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl b/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl index 9ca1c5deb..6e16e79d4 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl @@ -65,7 +65,7 @@ handle_info(_Msg, State) -> terminate(_Reason, _State) -> ok. -setup(Conf = #{enable := true}) -> +setup(Conf = #{metrics := #{enable := true}}) -> ensure_apps(Conf), create_metric_views(); setup(_Conf) -> @@ -73,11 +73,10 @@ setup(_Conf) -> ok. ensure_apps(Conf) -> - #{exporter := #{interval := ExporterInterval} = Exporter} = Conf, - {ok, _} = application:ensure_all_started(opentelemetry_exporter), - {ok, _} = application:ensure_all_started(opentelemetry), - {ok, _} = application:ensure_all_started(opentelemetry_experimental), - {ok, _} = application:ensure_all_started(opentelemetry_api_experimental), + #{ + exporter := Exporter, + metrics := #{interval := ExporterInterval} + } = Conf, _ = opentelemetry_experimental:stop_default_metrics(), ok = application:set_env( @@ -102,12 +101,12 @@ cleanup() -> safe_stop_default_metrics() -> try - _ = opentelemetry_experimental:stop_default_metrics() + _ = opentelemetry_experimental:stop_default_metrics(), + ok catch %% noramal scenario, metrics supervisor is not started exit:{noproc, _} -> ok - end, - ok. + end. create_metric_views() -> Meter = opentelemetry_experimental:get_meter(), diff --git a/apps/emqx_opentelemetry/src/emqx_otel_schema.erl b/apps/emqx_opentelemetry/src/emqx_otel_schema.erl index 927bc9dfd..bcd0b8dcf 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_schema.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_schema.erl @@ -30,15 +30,27 @@ upgrade_legacy_metrics(RawConf) -> case RawConf of #{<<"opentelemetry">> := Otel} -> - LegacyMetricsFields = [<<"enable">>, <<"exporter">>], - Otel1 = maps:without(LegacyMetricsFields, Otel), - Metrics = maps:with(LegacyMetricsFields, Otel), - case Metrics =:= #{} of - true -> - RawConf; - false -> - RawConf#{<<"opentelemetry">> => Otel1#{<<"metrics">> => Metrics}} - end; + Otel1 = + case maps:take(<<"enable">>, Otel) of + {MetricsEnable, OtelConf} -> + emqx_utils_maps:deep_put( + [<<"metrics">>, <<"enable">>], OtelConf, MetricsEnable + ); + error -> + Otel + end, + Otel2 = + case Otel1 of + #{<<"exporter">> := #{<<"interval">> := Interval} = Exporter} -> + emqx_utils_maps:deep_put( + [<<"metrics">>, <<"interval">>], + Otel1#{<<"exporter">> => maps:remove(<<"interval">>, Exporter)}, + Interval + ); + _ -> + Otel1 + end, + RawConf#{<<"opentelemetry">> => Otel2}; _ -> RawConf end. @@ -62,6 +74,20 @@ fields("opentelemetry") -> #{ desc => ?DESC(otel_logs) } + )}, + {traces, + ?HOCON( + ?R_REF("otel_traces"), + #{ + desc => ?DESC(otel_traces) + } + )}, + {exporter, + ?HOCON( + ?R_REF("otel_exporter"), + #{ + desc => ?DESC(otel_exporter) + } )} ]; fields("otel_metrics") -> @@ -75,10 +101,15 @@ fields("otel_metrics") -> desc => ?DESC(enable) } )}, - {exporter, + {interval, ?HOCON( - ?R_REF("otel_metrics_exporter"), - #{desc => ?DESC(exporter)} + emqx_schema:timeout_duration_ms(), + #{ + aliases => [scheduled_delay], + default => <<"10s">>, + desc => ?DESC(scheduled_delay), + importance => ?IMPORTANCE_HIDDEN + } )} ]; fields("otel_logs") -> @@ -127,34 +158,56 @@ fields("otel_logs") -> desc => ?DESC(scheduled_delay), importance => ?IMPORTANCE_HIDDEN } - )}, - {exporter, + )} + ]; +fields("otel_traces") -> + [ + {enable, ?HOCON( - ?R_REF("otel_logs_exporter"), + boolean(), #{ - desc => ?DESC(exporter), + default => false, + desc => ?DESC(enable), importance => ?IMPORTANCE_HIGH } + )}, + {max_queue_size, + ?HOCON( + pos_integer(), + #{ + default => 2048, + desc => ?DESC(max_queue_size), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {exporting_timeout, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"30s">>, + desc => ?DESC(exporting_timeout), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {scheduled_delay, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(scheduled_delay), + importance => ?IMPORTANCE_HIDDEN + } + )}, + {filter, + ?HOCON( + ?R_REF("trace_filter"), + #{ + desc => ?DESC(trace_filter), + importance => ?IMPORTANCE_MEDIUM + } )} ]; -fields("otel_metrics_exporter") -> - exporter_fields(metrics); -fields("otel_logs_exporter") -> - exporter_fields(logs); -fields("ssl_opts") -> - Schema = emqx_schema:client_ssl_opts_schema(#{}), - lists:keydelete("enable", 1, Schema). - -desc("opentelemetry") -> ?DESC(opentelemetry); -desc("exporter") -> ?DESC(exporter); -desc("otel_logs_exporter") -> ?DESC(exporter); -desc("otel_metrics_exporter") -> ?DESC(exporter); -desc("otel_logs") -> ?DESC(otel_logs); -desc("otel_metrics") -> ?DESC(otel_metrics); -desc("ssl_opts") -> ?DESC(exporter_ssl); -desc(_) -> undefined. - -exporter_fields(OtelSignal) -> +fields("otel_exporter") -> [ {endpoint, ?HOCON( @@ -183,21 +236,29 @@ exporter_fields(OtelSignal) -> importance => ?IMPORTANCE_LOW } )} - ] ++ exporter_extra_fields(OtelSignal). - -%% Let's keep it in exporter config for metrics, as it is different from -%% scheduled_delay_ms opt used for otel traces and logs -exporter_extra_fields(metrics) -> + ]; +fields("ssl_opts") -> + Schema = emqx_schema:client_ssl_opts_schema(#{}), + lists:keydelete("enable", 1, Schema); +fields("trace_filter") -> + %% More filters can be implemented in future, e.g. topic, clientid [ - {interval, + {trace_all, ?HOCON( - emqx_schema:timeout_duration_ms(), + boolean(), #{ - default => <<"10s">>, - required => true, - desc => ?DESC(scheduled_delay) + default => false, + desc => ?DESC(trace_all), + importance => ?IMPORTANCE_MEDIUM } )} - ]; -exporter_extra_fields(_OtelSignal) -> - []. + ]. + +desc("opentelemetry") -> ?DESC(opentelemetry); +desc("otel_exporter") -> ?DESC(otel_exporter); +desc("otel_logs") -> ?DESC(otel_logs); +desc("otel_metrics") -> ?DESC(otel_metrics); +desc("otel_traces") -> ?DESC(otel_traces); +desc("ssl_opts") -> ?DESC(exporter_ssl); +desc("trace_filter") -> ?DESC(trace_filter); +desc(_) -> undefined. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_sup.erl b/apps/emqx_opentelemetry/src/emqx_otel_sup.erl index a823b2239..9d8165b21 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_sup.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_sup.erl @@ -41,8 +41,8 @@ init([]) -> period => 512 }, Children = - case emqx_conf:get([opentelemetry, metrics]) of - #{enable := false} -> []; - #{enable := true} = Conf -> [worker_spec(emqx_otel_metrics, Conf)] + case emqx_conf:get([opentelemetry]) of + #{metrics := #{enable := false}} -> []; + #{metrics := #{enable := true}} = Conf -> [worker_spec(emqx_otel_metrics, Conf)] end, {ok, {SupFlags, Children}}. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_trace.erl b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl new file mode 100644 index 000000000..a3c73f206 --- /dev/null +++ b/apps/emqx_opentelemetry/src/emqx_otel_trace.erl @@ -0,0 +1,272 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_trace). + +-behaviour(emqx_external_trace). + +-export([ + ensure_traces/1, + start/1, + stop/0 +]). + +-export([toggle_registered/1]). + +-export([ + trace_process_publish/3, + start_trace_send/2, + end_trace_send/1, + event/2 +]). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("opentelemetry_api/include/otel_tracer.hrl"). + +-define(EMQX_OTEL_CTX, otel_ctx). +-define(IS_ENABLED, emqx_enable). +-define(USER_PROPERTY, 'User-Property'). + +-define(TRACE_ALL_KEY, {?MODULE, trace_all}). +-define(TRACE_ALL, persistent_term:get(?TRACE_ALL_KEY, false)). + +%%-------------------------------------------------------------------- +%% config +%%-------------------------------------------------------------------- + +-spec toggle_registered(boolean()) -> ok | {error, term()}. +toggle_registered(true = _Enable) -> + emqx_external_trace:register_provider(?MODULE); +toggle_registered(false = _Enable) -> + _ = emqx_external_trace:unregister_provider(?MODULE), + ok. + +-spec ensure_traces(map()) -> ok | {error, term()}. +ensure_traces(#{traces := #{enable := true}} = Conf) -> + start(Conf); +ensure_traces(_Conf) -> + ok. + +-spec start(map()) -> ok | {error, term()}. +start(#{traces := TracesConf, exporter := ExporterConf}) -> + #{ + max_queue_size := MaxQueueSize, + exporting_timeout := ExportingTimeout, + scheduled_delay := ScheduledDelay, + filter := #{trace_all := TraceAll} + } = TracesConf, + OtelEnv = [ + {bsp_scheduled_delay_ms, ScheduledDelay}, + {bsp_exporting_timeout_ms, ExportingTimeout}, + {bsp_max_queue_size, MaxQueueSize}, + {traces_exporter, emqx_otel_config:otel_exporter(ExporterConf)} + ], + set_trace_all(TraceAll), + ok = application:set_env([{opentelemetry, OtelEnv}]), + Res = assert_started(opentelemetry:start_default_tracer_provider()), + case Res of + ok -> + _ = toggle_registered(true), + Res; + Err -> + Err + end. + +-spec stop() -> ok. +stop() -> + _ = toggle_registered(false), + safe_stop_default_tracer(). + +%%-------------------------------------------------------------------- +%% trace API +%%-------------------------------------------------------------------- + +-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when + Packet :: emqx_types:packet(), + ChannelInfo :: emqx_external_trace:channel_info(), + Res :: term(). +trace_process_publish(Packet, ChannelInfo, ProcessFun) -> + case maybe_init_ctx(Packet) of + false -> + ProcessFun(Packet); + RootCtx -> + RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true), + Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)), + SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{ + attributes => Attrs + }), + Ctx = otel_tracer:set_current_span(RootCtx1, SpanCtx), + %% put ctx to packet, so it can be further propagated + Packet1 = put_ctx_to_packet(Ctx, Packet), + _ = otel_ctx:attach(Ctx), + try + ProcessFun(Packet1) + after + _ = ?end_span(), + clear() + end + end. + +-spec start_trace_send(list(emqx_types:deliver()), emqx_external_trace:channel_info()) -> + list(emqx_types:deliver()). +start_trace_send(Delivers, ChannelInfo) -> + lists:map( + fun({deliver, Topic, Msg} = Deliver) -> + case get_ctx_from_msg(Msg) of + Ctx when is_map(Ctx) -> + Attrs = maps:merge( + msg_attributes(Msg), sub_channel_attributes(ChannelInfo) + ), + StartOpts = #{attributes => Attrs}, + SpanCtx = otel_tracer:start_span( + Ctx, ?current_tracer, send_published_message, StartOpts + ), + Msg1 = put_ctx_to_msg( + otel_tracer:set_current_span(Ctx, SpanCtx), Msg + ), + {deliver, Topic, Msg1}; + _ -> + Deliver + end + end, + Delivers + ). + +-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok. +end_trace_send(Packets) -> + lists:foreach( + fun(Packet) -> + case get_ctx_from_packet(Packet) of + Ctx when is_map(Ctx) -> + otel_span:end_span(otel_tracer:current_span_ctx(Ctx)); + _ -> + ok + end + end, + packets_list(Packets) + ). + +%% NOTE: adds an event only within an active span (Otel Ctx must be set in the calling process dict) +-spec event(opentelemetry:event_name(), opentelemetry:attributes_map()) -> ok. +event(Name, Attributes) -> + case otel_ctx:get_value(?IS_ENABLED, false) of + true -> + ?add_event(Name, Attributes), + ok; + false -> + ok + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +packets_list(Packets) when is_list(Packets) -> + Packets; +packets_list(Packet) -> + [Packet]. + +maybe_init_ctx(#mqtt_packet{variable = Packet}) -> + case should_trace_packet(Packet) of + true -> + Ctx = extract_traceparent_from_packet(Packet), + should_trace_context(Ctx) andalso Ctx; + false -> + false + end. + +extract_traceparent_from_packet(Packet) -> + Ctx = otel_ctx:new(), + case emqx_packet:info(properties, Packet) of + #{?USER_PROPERTY := UserProps} -> + otel_propagator_text_map:extract_to(Ctx, UserProps); + _ -> + Ctx + end. + +should_trace_context(RootCtx) -> + map_size(RootCtx) > 0 orelse ?TRACE_ALL. + +should_trace_packet(Packet) -> + not is_sys(emqx_packet:info(topic_name, Packet)). + +%% TODO: move to emqx_topic module? +is_sys(<<"$SYS/", _/binary>> = _Topic) -> true; +is_sys(_Topic) -> false. + +msg_attributes(Msg) -> + #{ + 'messaging.destination.name' => emqx_message:topic(Msg), + 'messaging.client_id' => emqx_message:from(Msg) + }. + +packet_attributes(#mqtt_packet{variable = Packet}) -> + #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}. + +channel_attributes(ChannelInfo) -> + #{'messaging.client_id' => maps:get(clientid, ChannelInfo, undefined)}. + +sub_channel_attributes(ChannelInfo) -> + channel_attributes(ChannelInfo). + +put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) -> + Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}}; +%% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it +put_ctx_to_msg(OtelCtx, Msg) when is_record(Msg, message) -> + Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}}. + +put_ctx_to_packet( + OtelCtx, #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet +) -> + Extra = maps:get(internal_extra, Props, #{}), + Props1 = Props#{internal_extra => Extra#{?EMQX_OTEL_CTX => OtelCtx}}, + Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}}. + +get_ctx_from_msg(#message{extra = Extra}) -> + from_extra(Extra). + +get_ctx_from_packet(#mqtt_packet{ + variable = #mqtt_packet_publish{properties = #{internal_extra := Extra}} +}) -> + from_extra(Extra); +get_ctx_from_packet(_) -> + undefined. + +from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) -> + OtelCtx; +from_extra(_) -> + undefined. + +clear() -> + otel_ctx:clear(). + +safe_stop_default_tracer() -> + try + _ = opentelemetry:stop_default_tracer_provider(), + ok + catch + %% noramal scenario, opentelemetry supervisor is not started + exit:{noproc, _} -> ok + end, + ok. + +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; +assert_started({error, Reason}) -> {error, Reason}. + +set_trace_all(TraceAll) -> + persistent_term:put({?MODULE, trace_all}, TraceAll). diff --git a/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl new file mode 100644 index 000000000..f829ca640 --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(OTEL_API_PATH, emqx_mgmt_api_test_util:api_path(["opentelemetry"])). +-define(CONF_PATH, [opentelemetry]). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% This is called by emqx_machine in EMQX release + emqx_otel_app:configure_otel_deps(), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + emqx_opentelemetry + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + Auth = auth_header(), + [{suite_apps, Apps}, {auth, Auth} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +init_per_testcase(_TC, Config) -> + emqx_conf:update( + ?CONF_PATH, + #{ + <<"traces">> => #{<<"enable">> => false}, + <<"metrics">> => #{<<"enable">> => false}, + <<"logs">> => #{<<"enable">> => false} + }, + #{} + ), + Config. + +end_per_testcase(_TC, _Config) -> + ok. + +auth_header() -> + {ok, API} = emqx_common_test_http:create_default_app(), + emqx_common_test_http:auth_header(API). + +t_get(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ?assertMatch( + #{ + <<"traces">> := #{<<"enable">> := false}, + <<"metrics">> := #{<<"enable">> := false}, + <<"logs">> := #{<<"enable">> := false} + }, + emqx_utils_json:decode(Resp) + ). + +t_put_enable_disable(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + EnableAllReq = #{ + <<"traces">> => #{<<"enable">> => true}, + <<"metrics">> => #{<<"enable">> => true}, + <<"logs">> => #{<<"enable">> => true} + }, + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, EnableAllReq)), + ?assertMatch( + #{ + traces := #{enable := true}, + metrics := #{enable := true}, + logs := #{enable := true} + }, + emqx:get_config(?CONF_PATH) + ), + + DisableAllReq = #{ + <<"traces">> => #{<<"enable">> => false}, + <<"metrics">> => #{<<"enable">> => false}, + <<"logs">> => #{<<"enable">> => false} + }, + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, DisableAllReq)), + ?assertMatch( + #{ + traces := #{enable := false}, + metrics := #{enable := false}, + logs := #{enable := false} + }, + emqx:get_config(?CONF_PATH) + ). + +t_put_invalid(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<>>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"unknown://somehost.org">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"unknown_field">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"protocol">> => <<"unknown">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"traces">> => #{<<"filter">> => #{<<"unknown_filter">> => <<"foo">>}} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"logs">> => #{<<"level">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"metrics">> => #{<<"interval">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"logs">> => #{<<"unknown_field">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"unknown_field">> => <<"foo">>}) + ). + +t_put_valid(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"nohost.com">>} + }) + ), + ?assertEqual(<<"http://nohost.com/">>, emqx:get_config(?CONF_PATH ++ [exporter, endpoint])), + + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"exporter">> => #{}}) + ), + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{})), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"traces">> => #{}}) + ), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"logs">> => #{}}) + ), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"metrics">> => #{}}) + ), + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api( + put, + Path, + "", + Auth, + #{<<"exporter">> => #{}, <<"traces">> => #{}, <<"logs">> => #{}, <<"metrics">> => #{}} + ) + ), + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api( + put, + Path, + "", + Auth, + #{ + <<"exporter">> => #{ + <<"endpoint">> => <<"https://localhost:4317">>, <<"protocol">> => <<"grpc">> + }, + <<"traces">> => #{ + <<"enable">> => true, + <<"max_queue_size">> => 10, + <<"exporting_timeout">> => <<"10s">>, + <<"scheduled_delay">> => <<"20s">>, + <<"filter">> => #{<<"trace_all">> => true} + }, + <<"logs">> => #{ + <<"level">> => <<"warning">>, + <<"max_queue_size">> => 100, + <<"exporting_timeout">> => <<"10s">>, + <<"scheduled_delay">> => <<"1s">> + }, + <<"metrics">> => #{ + %% alias for "interval" + <<"scheduled_delay">> => <<"15321ms">> + } + } + ), + %% alias check + ?assertEqual(15_321, emqx:get_config(?CONF_PATH ++ [metrics, interval])) + ). diff --git a/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl new file mode 100644 index 000000000..f5682dcad --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl @@ -0,0 +1,201 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_schema_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% Backward compatibility suite for `upgrade_raw_conf/1`, +%% expected callback is `emqx_otel_schema:upgrade_legacy_metrics/1` + +-define(OLD_CONF_ENABLED, << + "\n" + "opentelemetry\n" + "{\n" + " enable = true\n" + "}\n" +>>). + +-define(OLD_CONF_DISABLED, << + "\n" + "opentelemetry\n" + "{\n" + " enable = false\n" + "}\n" +>>). + +-define(OLD_CONF_ENABLED_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " enable = true\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_DISABLED_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " enable = false\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER_PARTIAL, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\"}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER_PARTIAL1, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {interval = 3s}\n" + "}\n" +>>). + +-define(TESTS_CONF, #{ + t_old_conf_enabled => ?OLD_CONF_ENABLED, + t_old_conf_disabled => ?OLD_CONF_DISABLED, + t_old_conf_enabled_exporter => ?OLD_CONF_ENABLED_EXPORTER, + t_old_conf_disabled_exporter => ?OLD_CONF_DISABLED_EXPORTER, + t_old_conf_exporter => ?OLD_CONF_EXPORTER, + t_old_conf_exporter_partial => ?OLD_CONF_EXPORTER_PARTIAL, + t_old_conf_exporter_partial1 => ?OLD_CONF_EXPORTER_PARTIAL1 +}). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(TC, Config) -> + Apps = start_apps(TC, Config, maps:get(TC, ?TESTS_CONF)), + [{suite_apps, Apps} | Config]. + +end_per_testcase(_TC, Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +start_apps(TC, Config, OtelConf) -> + emqx_cth_suite:start( + [ + {emqx_conf, OtelConf}, + emqx_management, + emqx_opentelemetry + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ). + +t_old_conf_enabled(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{metrics := #{enable := true, interval := _}, exporter := #{endpoint := _}}, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_disabled(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{metrics := #{enable := false, interval := _}, exporter := #{endpoint := _}}, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_enabled_exporter(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := true, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_disabled_exporter(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter(_Config) -> + io:format(user, "TC running: ~p~n", [?FUNCTION_NAME]), + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter_partial(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := _}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter_partial1(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 3000}, + exporter := #{endpoint := _} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). diff --git a/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl new file mode 100644 index 000000000..88917d7e3 --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl @@ -0,0 +1,431 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_otel_trace_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(OTEL_SERVICE_NAME, "emqx"). +-define(CONF_PATH, [opentelemetry]). + +%% How to run it locally: +%% 1. Uncomment networks in .ci/docker-compose-file/docker-compose-otel.yaml, +%% Uncomment OTLP gRPC ports mappings for otel-collector and otel-collector-tls services. +%% Uncomment jaeger-all-in-one prots maooing. +%% 2. Start deps services: +%% DOCKER_USER="$(id -u)" docker-compose -f .ci/docker-compose-file/docker-compose-otel.yaml up +%% 3. Run tests with special env variables: +%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \ +%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \ +%% make "apps/emqx_opentelemetry-ct" +%% Or run only this suite: +%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \ +%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \ +%% ./rebar3 ct -v --readable=true --name 'test@127.0.0.1' \ +%% --suite apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, TCs}, + {tls, TCs} + ]. + +init_per_suite(Config) -> + %% This is called by emqx_machine in EMQX release + emqx_otel_app:configure_otel_deps(), + %% No release name during the test case, we need a reliable service name to query Jaeger + os:putenv("OTEL_SERVICE_NAME", ?OTEL_SERVICE_NAME), + JaegerURL = os:getenv("JAEGER_URL", "http://jaeger.emqx.net:16686"), + [{jaeger_url, JaegerURL} | Config]. + +end_per_suite(_) -> + os:unsetenv("OTEL_SERVICE_NAME"), + ok. + +init_per_group(tcp = Group, Config) -> + OtelCollectorURL = os:getenv("OTEL_COLLECTOR_URL", "http://otel-collector.emqx.net:4317"), + [ + {otel_collector_url, OtelCollectorURL}, + {logs_exporter_file_path, logs_exporter_file_path(Group, Config)} + | Config + ]; +init_per_group(tls = Group, Config) -> + OtelCollectorURL = os:getenv( + "OTEL_COLLECTOR_TLS_URL", "https://otel-collector-tls.emqx.net:4317" + ), + [ + {otel_collector_url, OtelCollectorURL}, + {logs_exporter_file_path, logs_exporter_file_path(Group, Config)} + | Config + ]. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(t_distributed_trace = TC, Config) -> + Cluster = cluster(TC, Config), + [{cluster, Cluster} | Config]; +init_per_testcase(TC, Config) -> + Apps = emqx_cth_suite:start(apps_spec(), #{work_dir => emqx_cth_suite:work_dir(TC, Config)}), + [{suite_apps, Apps} | Config]. + +end_per_testcase(t_distributed_trace = _TC, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + emqx_config:delete_override_conf_files(), + ok; +end_per_testcase(_TC, Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +t_trace(Config) -> + MqttHostPort = mqtt_host_port(), + + {ok, _} = emqx_conf:update(?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}), + + Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + TopicNoSubs = <<"t/trace/test/nosub/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn1 = connect(MqttHostPort, <<"sub1">>), + {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic), + SubConn2 = connect(MqttHostPort, <<"sub2">>), + {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic), + PubConn = connect(MqttHostPort, <<"pub">>), + + TraceParent = traceparent(true), + TraceParentNotSampled = traceparent(false), + ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []), + ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []), + + TraceParentNoSub = traceparent(true), + TraceParentNoSubNotSampled = traceparent(false), + ok = emqtt:publish(PubConn, TopicNoSubs, props(TraceParentNoSub), <<"must be traced">>, []), + ok = emqtt:publish( + PubConn, TopicNoSubs, props(TraceParentNoSubNotSampled), <<"must not be traced">>, [] + ), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + [Trace] = filter_traces(trace_id(TraceParent), Traces), + [] = filter_traces(trace_id(TraceParentNotSampled), Traces), + [TraceNoSub] = filter_traces(trace_id(TraceParentNoSub), Traces), + [] = filter_traces(trace_id(TraceParentNoSubNotSampled), Traces), + + #{<<"spans">> := Spans, <<"processes">> := _} = Trace, + %% 2 sub spans and 1 publish process span + IsExpectedSpansLen = length(Spans) =:= 3, + + #{<<"spans">> := SpansNoSub, <<"processes">> := _} = TraceNoSub, + %% Only 1 publish process span + IsExpectedSpansLen andalso 1 =:= length(SpansNoSub) + end, + 10_000 + ) + ), + stop_conns([SubConn1, SubConn2, PubConn]). + +t_trace_disabled(_Config) -> + ?assertNot(emqx:get_config(?CONF_PATH ++ [traces, enable])), + %% Tracer must be actually disabled + ?assertEqual({otel_tracer_noop, []}, opentelemetry:get_tracer()), + ?assertEqual(undefined, emqx_external_trace:provider()), + + Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn = connect(mqtt_host_port(), <<"sub">>), + {ok, _, [0]} = emqtt:subscribe(SubConn, Topic), + PubConn = connect(mqtt_host_port(), <<"pub">>), + + TraceParent = traceparent(true), + emqtt:publish(PubConn, Topic, props(TraceParent), <<>>, []), + receive + {publish, #{topic := Topic, properties := Props}} -> + %% traceparent must be propagated by EMQX even if internal otel trace is disabled + #{'User-Property' := [{<<"traceparent">>, TrParent}]} = Props, + ?assertEqual(TraceParent, TrParent) + after 10_000 -> + ct:fail("published_message_not_received") + end, + + %% if otel trace is registered but is actually not running, EMQX must work fine + %% and the message must be delivered to the subscriber + ok = emqx_otel_trace:toggle_registered(true), + TraceParent1 = traceparent(true), + emqtt:publish(PubConn, Topic, props(TraceParent1), <<>>, []), + receive + {publish, #{topic := Topic, properties := Props1}} -> + #{'User-Property' := [{<<"traceparent">>, TrParent1}]} = Props1, + ?assertEqual(TraceParent1, TrParent1) + after 10_000 -> + ct:fail("published_message_not_received") + end, + stop_conns([SubConn, PubConn]). + +t_trace_all(Config) -> + OtelConf = enabled_trace_conf(Config), + OtelConf1 = emqx_utils_maps:deep_put([<<"traces">>, <<"filter">>], OtelConf, #{ + <<"trace_all">> => true + }), + {ok, _} = emqx_conf:update(?CONF_PATH, OtelConf1, #{override_to => cluster}), + + Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>, + ClientId = <<"pub-", (integer_to_binary(erlang:system_time(nanosecond)))/binary>>, + PubConn = connect(mqtt_host_port(), ClientId), + emqtt:publish(PubConn, Topic, #{}, <<>>, []), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + Res = lists:filter( + fun(#{<<"spans">> := Spans}) -> + case Spans of + %% Only one span is expected as there are no subscribers + [#{<<"tags">> := Tags}] -> + lists:any( + fun(#{<<"key">> := K, <<"value">> := Val}) -> + K =:= <<"messaging.client_id">> andalso Val =:= ClientId + end, + Tags + ); + _ -> + false + end + end, + Traces + ), + %% Expecting exactly 1 span + length(Res) =:= 1 + end, + 10_000 + ) + ), + stop_conns([PubConn]). + +t_distributed_trace(Config) -> + [Core1, Core2, Repl] = Cluster = ?config(cluster, Config), + {ok, _} = rpc:call( + Core1, + emqx_conf, + update, + [?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}] + ), + Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn1 = connect(mqtt_host_port(Core1), <<"sub1">>), + {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic), + SubConn2 = connect(mqtt_host_port(Core2), <<"sub2">>), + {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic), + SubConn3 = connect(mqtt_host_port(Repl), <<"sub3">>), + {ok, _, [0]} = emqtt:subscribe(SubConn3, Topic), + + PubConn = connect(mqtt_host_port(Repl), <<"pub">>), + + TraceParent = traceparent(true), + TraceParentNotSampled = traceparent(false), + + ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []), + ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + [Trace] = filter_traces(trace_id(TraceParent), Traces), + + [] = filter_traces(trace_id(TraceParentNotSampled), Traces), + + #{<<"spans">> := Spans, <<"processes">> := Procs} = Trace, + + %% 3 sub spans and 1 publish process span + 4 = length(Spans), + [_, _, _] = SendSpans = filter_spans(<<"send_published_message">>, Spans), + + IsAllNodesSpans = + lists:sort([atom_to_binary(N) || N <- Cluster]) =:= + lists:sort([span_node(S, Procs) || S <- SendSpans]), + + [PubSpan] = filter_spans(<<"process_message">>, Spans), + atom_to_binary(Repl) =:= span_node(PubSpan, Procs) andalso IsAllNodesSpans + end, + 10_000 + ) + ), + stop_conns([SubConn1, SubConn2, SubConn3, PubConn]). + +%% Keeping this test in this SUITE as there is no separate module for logs +t_log(Config) -> + Level = emqx_logger:get_primary_log_level(), + LogsConf = #{ + <<"logs">> => #{ + <<"enable">> => true, + <<"level">> => atom_to_binary(Level), + <<"scheduled_delay">> => <<"20ms">> + }, + <<"exporter">> => exporter_conf(Config) + }, + {ok, _} = emqx_conf:update(?CONF_PATH, LogsConf, #{override_to => cluster}), + + %% Ids are only needed for matching logs in the file exported by otel-collector + Id = integer_to_binary(otel_id_generator:generate_trace_id()), + ?SLOG(Level, #{msg => "otel_test_log_message", id => Id}), + Id1 = integer_to_binary(otel_id_generator:generate_trace_id()), + logger:Level("Ordinary log message, id: ~p", [Id1]), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, Logs} = file:read_file(?config(logs_exporter_file_path, Config)), + binary:match(Logs, Id) =/= nomatch andalso binary:match(Logs, Id1) =/= nomatch + end, + 10_000 + ) + ). + +logs_exporter_file_path(Group, Config) -> + filename:join([project_dir(Config), logs_exporter_filename(Group)]). + +project_dir(Config) -> + filename:join( + lists:takewhile( + fun(PathPart) -> PathPart =/= "_build" end, + filename:split(?config(priv_dir, Config)) + ) + ). + +logs_exporter_filename(tcp) -> + ".ci/docker-compose-file/otel/otel-collector.json"; +logs_exporter_filename(tls) -> + ".ci/docker-compose-file/otel/otel-collector-tls.json". + +enabled_trace_conf(TcConfig) -> + #{ + <<"traces">> => #{ + <<"enable">> => true, + <<"scheduled_delay">> => <<"50ms">> + }, + <<"exporter">> => exporter_conf(TcConfig) + }. + +exporter_conf(TcConfig) -> + #{<<"endpoint">> => ?config(otel_collector_url, TcConfig)}. + +span_node(#{<<"processID">> := ProcId}, Procs) -> + #{ProcId := #{<<"tags">> := ProcTags}} = Procs, + [#{<<"value">> := Node}] = lists:filter( + fun(#{<<"key">> := K}) -> + K =:= <<"service.instance.id">> + end, + ProcTags + ), + Node. + +trace_id(<<"00-", TraceId:32/binary, _/binary>>) -> + TraceId. + +filter_traces(TraceId, Traces) -> + lists:filter(fun(#{<<"traceID">> := TrId}) -> TrId =:= TraceId end, Traces). + +filter_spans(OpName, Spans) -> + lists:filter(fun(#{<<"operationName">> := Name}) -> Name =:= OpName end, Spans). + +get_jaeger_traces(JagerBaseURL) -> + case httpc:request(JagerBaseURL ++ "/api/traces?service=" ++ ?OTEL_SERVICE_NAME) of + {ok, {{_, 200, _}, _, RespBpdy}} -> + {ok, emqx_utils_json:decode(RespBpdy)}; + Err -> + ct:pal("Jager error: ~p", Err), + Err + end. + +stop_conns(Conns) -> + lists:foreach(fun emqtt:stop/1, Conns). + +props(TraceParent) -> + #{'User-Property' => [{<<"traceparent">>, TraceParent}]}. + +traceparent(IsSampled) -> + TraceId = otel_id_generator:generate_trace_id(), + SpanId = otel_id_generator:generate_span_id(), + {ok, TraceIdHexStr} = otel_utils:format_binary_string("~32.16.0b", [TraceId]), + {ok, SpanIdHexStr} = otel_utils:format_binary_string("~16.16.0b", [SpanId]), + TraceFlags = + case IsSampled of + true -> <<"01">>; + false -> <<"00">> + end, + <<"00-", TraceIdHexStr/binary, "-", SpanIdHexStr/binary, "-", TraceFlags/binary>>. + +connect({Host, Port}, ClientId) -> + {ok, ConnPid} = emqtt:start_link([ + {proto_ver, v5}, + {host, Host}, + {port, Port}, + {clientid, ClientId} + ]), + {ok, _} = emqtt:connect(ConnPid), + ConnPid. + +mqtt_host_port() -> + emqx:get_config([listeners, tcp, default, bind]). + +mqtt_host_port(Node) -> + rpc:call(Node, emqx, get_config, [[listeners, tcp, default, bind]]). + +cluster(TC, Config) -> + Nodes = emqx_cth_cluster:start( + [ + {otel_trace_core1, #{role => core, apps => apps_spec()}}, + {otel_trace_core2, #{role => core, apps => apps_spec()}}, + {otel_trace_replicant, #{role => replicant, apps => apps_spec()}} + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ), + Nodes. + +apps_spec() -> + [ + emqx, + emqx_conf, + emqx_management, + emqx_opentelemetry + ]. diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index a0d196fa9..cbb452c41 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -36,8 +36,8 @@ payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer(), - %% not used so far, for future extension - extra = [] :: term() + %% Miscellaneous extensions, currently used for OpenTelemetry context propagation + extra = #{} :: term() }). -endif. diff --git a/changes/ce/feat-11984.en.md b/changes/ce/feat-11984.en.md new file mode 100644 index 000000000..e4e0a7717 --- /dev/null +++ b/changes/ce/feat-11984.en.md @@ -0,0 +1 @@ +Implemented Open Telemetry distributed tracing feature. diff --git a/mix.exs b/mix.exs index 2e236bc96..8e1bd2b76 100644 --- a/mix.exs +++ b/mix.exs @@ -98,37 +98,7 @@ defmodule EMQXUmbrella.MixProject do # set by hackney (dependency) {:ssl_verify_fun, "1.1.7", override: true}, {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}, - {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}, - {:opentelemetry_api, - github: "emqx/opentelemetry-erlang", - sparse: "apps/opentelemetry_api", - tag: "v1.4.2-emqx", - override: true, - runtime: false}, - {:opentelemetry, - github: "emqx/opentelemetry-erlang", - sparse: "apps/opentelemetry", - tag: "v1.4.2-emqx", - override: true, - runtime: false}, - {:opentelemetry_api_experimental, - github: "emqx/opentelemetry-erlang", - sparse: "apps/opentelemetry_api_experimental", - tag: "v1.4.2-emqx", - override: true, - runtime: false}, - {:opentelemetry_experimental, - github: "emqx/opentelemetry-erlang", - sparse: "apps/opentelemetry_experimental", - tag: "v1.4.2-emqx", - override: true, - runtime: false}, - {:opentelemetry_exporter, - github: "emqx/opentelemetry-erlang", - sparse: "apps/opentelemetry_exporter", - tag: "v1.4.2-emqx", - override: true, - runtime: false} + {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() diff --git a/rebar.config b/rebar.config index 50a3c9867..51ec23a2e 100644 --- a/rebar.config +++ b/rebar.config @@ -84,14 +84,6 @@ %% in conflict by erlavro and rocketmq , {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}} , {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}} - %% trace - , {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api"}} - , {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry"}} - %% log metrics - , {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_experimental"}} - , {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api_experimental"}} - %% export - , {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_exporter"}} , {ssl_verify_fun, "1.1.7"} ]}. diff --git a/rel/i18n/emqx_otel_schema.hocon b/rel/i18n/emqx_otel_schema.hocon index 9e59b2a76..eca20b457 100644 --- a/rel/i18n/emqx_otel_schema.hocon +++ b/rel/i18n/emqx_otel_schema.hocon @@ -11,11 +11,14 @@ otel_logs.label: "Open Telemetry Logs" otel_metrics.desc: "Open Telemetry Metrics configuration." otel_metrics.label: "Open Telemetry Metrics" +otel_traces.desc: "Open Telemetry Traces configuration." +otel_traces.label: "Open Telemetry Traces" + enable.desc: "Enable or disable Open Telemetry signal." enable.label: "Enable." -exporter.desc: "Open Telemetry Exporter" -exporter.label: "Exporter" +otel_exporter.desc: "Open Telemetry Exporter" +otel_exporter.label: "Exporter" max_queue_size.desc: """The maximum queue size. After the size is reached Open Telemetry signals are dropped.""" @@ -41,4 +44,13 @@ otel_log_handler_level.desc: """The log level of the Open Telemetry log handler.""" otel_log_handler_level.label: "Log Level" +trace_filter.desc: "Open Telemetry Trace Filter configuration" +trace_filter.label: "Trace Filter" + +trace_all.desc: +"""If enabled, all published messages are traced, a new trace ID is generated if it can't be extracted from the message. +Otherwise, only messages published with trace context are traced. Disabled by default.""" +trace_all.label: "Trace All" + + } diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 4de3b4d7c..7959581a9 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -243,6 +243,9 @@ for dep in ${CT_DEPS}; do ldap) FILES+=( '.ci/docker-compose-file/docker-compose-ldap.yaml' ) ;; + otel) + FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1