From 85441fda0f2f334cfc81e4d3653911418ce2cf26 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 29 Nov 2023 18:27:44 +0200 Subject: [PATCH] test(emqx_opentelemetry): add trace test suite --- .../docker-compose-otel.yaml | 69 +++ .ci/docker-compose-file/otel/.gitignore | 6 + .../otel/otel-collector-config-tls.yaml | 52 +++ .../otel/otel-collector-config.yaml | 51 +++ apps/emqx/src/emqx_external_trace.erl | 4 + apps/emqx_opentelemetry/docker-ct | 1 + apps/emqx_opentelemetry/src/emqx_otel_api.erl | 21 +- .../test/emqx_otel_api_SUITE.erl | 252 ++++++++++ .../test/emqx_otel_schema_SUITE.erl | 201 ++++++++ .../test/emqx_otel_trace_SUITE.erl | 431 ++++++++++++++++++ scripts/ct/run.sh | 3 + 11 files changed, 1078 insertions(+), 13 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-otel.yaml create mode 100644 .ci/docker-compose-file/otel/.gitignore create mode 100644 .ci/docker-compose-file/otel/otel-collector-config-tls.yaml create mode 100644 .ci/docker-compose-file/otel/otel-collector-config.yaml create mode 100644 apps/emqx_opentelemetry/docker-ct create mode 100644 apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl create mode 100644 apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl create mode 100644 apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl diff --git a/.ci/docker-compose-file/docker-compose-otel.yaml b/.ci/docker-compose-file/docker-compose-otel.yaml new file mode 100644 index 000000000..a2d1bfae7 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-otel.yaml @@ -0,0 +1,69 @@ +version: '3.9' + +services: + jaeger-all-in-one: + image: jaegertracing/all-in-one:1.51.0 + container_name: jaeger.emqx.net + hostname: jaeger.emqx.net + networks: + - emqx_bridge + restart: always +# ports: +# - "16686:16686" + user: "${DOCKER_USER:-root}" + + # Collector + otel-collector: + image: otel/opentelemetry-collector:0.90.0 + container_name: otel-collector.emqx.net + hostname: otel-collector.emqx.net + networks: + - emqx_bridge + restart: always + command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] + volumes: + - ./otel:/etc/ +# ports: +# - "1888:1888" # pprof extension +# - "8888:8888" # Prometheus metrics exposed by the collector +# - "8889:8889" # Prometheus exporter metrics +# - "13133:13133" # health_check extension +# - "4317:4317" # OTLP gRPC receiver +# - "4318:4318" # OTLP http receiver +# - "55679:55679" # zpages extension + depends_on: + - jaeger-all-in-one + user: "${DOCKER_USER:-root}" + + +# Collector + otel-collector-tls: + image: otel/opentelemetry-collector:0.90.0 + container_name: otel-collector-tls.emqx.net + hostname: otel-collector-tls.emqx.net + networks: + - emqx_bridge + restart: always + command: ["--config=/etc/otel-collector-config-tls.yaml", "${OTELCOL_ARGS}"] + volumes: + - ./otel:/etc/ + - ./certs:/etc/certs + # ports: + # - "14317:4317" # OTLP gRPC receiver + depends_on: + - jaeger-all-in-one + user: "${DOCKER_USER:-root}" + +#networks: +# emqx_bridge: +# driver: bridge +# name: emqx_bridge +# enable_ipv6: true +# ipam: +# driver: default +# config: +# - subnet: 172.100.239.0/24 +# gateway: 172.100.239.1 +# - subnet: 2001:3200:3200::/64 +# gateway: 2001:3200:3200::1 +# diff --git a/.ci/docker-compose-file/otel/.gitignore b/.ci/docker-compose-file/otel/.gitignore new file mode 100644 index 000000000..98dacbd74 --- /dev/null +++ b/.ci/docker-compose-file/otel/.gitignore @@ -0,0 +1,6 @@ +certs +hostname +hosts +otel-collector.json +otel-collector-tls.json +resolv.conf diff --git a/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml b/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml new file mode 100644 index 000000000..9163fc724 --- /dev/null +++ b/.ci/docker-compose-file/otel/otel-collector-config-tls.yaml @@ -0,0 +1,52 @@ +receivers: + otlp: + protocols: + grpc: + tls: + ca_file: /etc/certs/ca.crt + cert_file: /etc/certs/server.crt + key_file: /etc/certs/server.key + http: + tls: + ca_file: /etc/certs/ca.crt + cert_file: /etc/certs/server.crt + key_file: /etc/certs/server.key + +exporters: + logging: + verbosity: detailed + otlp: + endpoint: jaeger.emqx.net:4317 + tls: + insecure: true + debug: + verbosity: detailed + file: + path: /etc/otel-collector-tls.json + + +processors: + batch: + # send data immediately + timeout: 0 + +extensions: + health_check: + zpages: + endpoint: :55679 + +service: + extensions: [zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file] diff --git a/.ci/docker-compose-file/otel/otel-collector-config.yaml b/.ci/docker-compose-file/otel/otel-collector-config.yaml new file mode 100644 index 000000000..6d6650139 --- /dev/null +++ b/.ci/docker-compose-file/otel/otel-collector-config.yaml @@ -0,0 +1,51 @@ +receivers: + otlp: + protocols: + grpc: + tls: +# ca_file: /etc/ca.pem +# cert_file: /etc/server.pem +# key_file: /etc/server.key + http: + tls: +# ca_file: /etc/ca.pem +# cert_file: /etc/server.pem +# key_file: /etc/server.key + +exporters: + logging: + verbosity: detailed + otlp: + endpoint: jaeger.emqx.net:4317 + tls: + insecure: true + debug: + verbosity: detailed + file: + path: /etc/otel-collector.json + +processors: + batch: + # send data immediately + timeout: 0 + +extensions: + health_check: + zpages: + endpoint: :55679 + +service: + extensions: [zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file] diff --git a/apps/emqx/src/emqx_external_trace.erl b/apps/emqx/src/emqx_external_trace.erl index fb32b8248..b03e7139a 100644 --- a/apps/emqx/src/emqx_external_trace.erl +++ b/apps/emqx/src/emqx_external_trace.erl @@ -28,6 +28,7 @@ -callback event(EventName :: term(), Attributes :: term()) -> ok. -export([ + provider/0, register_provider/1, unregister_provider/1, trace_process_publish/3, @@ -71,6 +72,9 @@ unregister_provider(Module) -> {error, not_registered} end. +-spec provider() -> module() | undefined. +provider() -> + persistent_term:get(?PROVIDER, undefined). %%-------------------------------------------------------------------- %% trace API %%-------------------------------------------------------------------- diff --git a/apps/emqx_opentelemetry/docker-ct b/apps/emqx_opentelemetry/docker-ct new file mode 100644 index 000000000..8f7569d06 --- /dev/null +++ b/apps/emqx_opentelemetry/docker-ct @@ -0,0 +1 @@ +otel diff --git a/apps/emqx_opentelemetry/src/emqx_otel_api.erl b/apps/emqx_opentelemetry/src/emqx_otel_api.erl index d8c76ebcf..f14bdb00b 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_api.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_api.erl @@ -103,24 +103,19 @@ otel_config_schema() -> otel_config_example() -> #{ + exporter => #{ + endpoint => "http://localhost:4317", + ssl_options => #{} + }, logs => #{ enable => true, - exporter => #{ - endpoint => "http://localhost:4317", - ssl_options => #{ - enable => false - } - }, level => warning }, metrics => #{ + enable => true + }, + traces => #{ enable => true, - exporter => #{ - endpoint => "http://localhost:4317", - interval => "10s", - ssl_options => #{ - enable => false - } - } + filter => #{trace_all => false} } }. diff --git a/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl new file mode 100644 index 000000000..f829ca640 --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(OTEL_API_PATH, emqx_mgmt_api_test_util:api_path(["opentelemetry"])). +-define(CONF_PATH, [opentelemetry]). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% This is called by emqx_machine in EMQX release + emqx_otel_app:configure_otel_deps(), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + emqx_opentelemetry + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + Auth = auth_header(), + [{suite_apps, Apps}, {auth, Auth} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +init_per_testcase(_TC, Config) -> + emqx_conf:update( + ?CONF_PATH, + #{ + <<"traces">> => #{<<"enable">> => false}, + <<"metrics">> => #{<<"enable">> => false}, + <<"logs">> => #{<<"enable">> => false} + }, + #{} + ), + Config. + +end_per_testcase(_TC, _Config) -> + ok. + +auth_header() -> + {ok, API} = emqx_common_test_http:create_default_app(), + emqx_common_test_http:auth_header(API). + +t_get(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ?assertMatch( + #{ + <<"traces">> := #{<<"enable">> := false}, + <<"metrics">> := #{<<"enable">> := false}, + <<"logs">> := #{<<"enable">> := false} + }, + emqx_utils_json:decode(Resp) + ). + +t_put_enable_disable(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + EnableAllReq = #{ + <<"traces">> => #{<<"enable">> => true}, + <<"metrics">> => #{<<"enable">> => true}, + <<"logs">> => #{<<"enable">> => true} + }, + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, EnableAllReq)), + ?assertMatch( + #{ + traces := #{enable := true}, + metrics := #{enable := true}, + logs := #{enable := true} + }, + emqx:get_config(?CONF_PATH) + ), + + DisableAllReq = #{ + <<"traces">> => #{<<"enable">> => false}, + <<"metrics">> => #{<<"enable">> => false}, + <<"logs">> => #{<<"enable">> => false} + }, + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, DisableAllReq)), + ?assertMatch( + #{ + traces := #{enable := false}, + metrics := #{enable := false}, + logs := #{enable := false} + }, + emqx:get_config(?CONF_PATH) + ). + +t_put_invalid(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<>>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"unknown://somehost.org">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"unknown_field">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"protocol">> => <<"unknown">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"traces">> => #{<<"filter">> => #{<<"unknown_filter">> => <<"foo">>}} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"logs">> => #{<<"level">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"metrics">> => #{<<"interval">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"logs">> => #{<<"unknown_field">> => <<"foo">>} + }) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"unknown_field">> => <<"foo">>}) + ). + +t_put_valid(Config) -> + Auth = ?config(auth, Config), + Path = ?OTEL_API_PATH, + + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{ + <<"exporter">> => #{<<"endpoint">> => <<"nohost.com">>} + }) + ), + ?assertEqual(<<"http://nohost.com/">>, emqx:get_config(?CONF_PATH ++ [exporter, endpoint])), + + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"exporter">> => #{}}) + ), + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{})), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"traces">> => #{}}) + ), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"logs">> => #{}}) + ), + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"metrics">> => #{}}) + ), + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api( + put, + Path, + "", + Auth, + #{<<"exporter">> => #{}, <<"traces">> => #{}, <<"logs">> => #{}, <<"metrics">> => #{}} + ) + ), + ?assertMatch( + {ok, _}, + emqx_mgmt_api_test_util:request_api( + put, + Path, + "", + Auth, + #{ + <<"exporter">> => #{ + <<"endpoint">> => <<"https://localhost:4317">>, <<"protocol">> => <<"grpc">> + }, + <<"traces">> => #{ + <<"enable">> => true, + <<"max_queue_size">> => 10, + <<"exporting_timeout">> => <<"10s">>, + <<"scheduled_delay">> => <<"20s">>, + <<"filter">> => #{<<"trace_all">> => true} + }, + <<"logs">> => #{ + <<"level">> => <<"warning">>, + <<"max_queue_size">> => 100, + <<"exporting_timeout">> => <<"10s">>, + <<"scheduled_delay">> => <<"1s">> + }, + <<"metrics">> => #{ + %% alias for "interval" + <<"scheduled_delay">> => <<"15321ms">> + } + } + ), + %% alias check + ?assertEqual(15_321, emqx:get_config(?CONF_PATH ++ [metrics, interval])) + ). diff --git a/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl new file mode 100644 index 000000000..f5682dcad --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl @@ -0,0 +1,201 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_otel_schema_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% Backward compatibility suite for `upgrade_raw_conf/1`, +%% expected callback is `emqx_otel_schema:upgrade_legacy_metrics/1` + +-define(OLD_CONF_ENABLED, << + "\n" + "opentelemetry\n" + "{\n" + " enable = true\n" + "}\n" +>>). + +-define(OLD_CONF_DISABLED, << + "\n" + "opentelemetry\n" + "{\n" + " enable = false\n" + "}\n" +>>). + +-define(OLD_CONF_ENABLED_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " enable = true\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_DISABLED_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " enable = false\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER_PARTIAL, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {endpoint = \"http://127.0.0.1:4317/\"}\n" + "}\n" +>>). + +-define(OLD_CONF_EXPORTER_PARTIAL1, << + "\n" + "opentelemetry\n" + "{\n" + " exporter {interval = 3s}\n" + "}\n" +>>). + +-define(TESTS_CONF, #{ + t_old_conf_enabled => ?OLD_CONF_ENABLED, + t_old_conf_disabled => ?OLD_CONF_DISABLED, + t_old_conf_enabled_exporter => ?OLD_CONF_ENABLED_EXPORTER, + t_old_conf_disabled_exporter => ?OLD_CONF_DISABLED_EXPORTER, + t_old_conf_exporter => ?OLD_CONF_EXPORTER, + t_old_conf_exporter_partial => ?OLD_CONF_EXPORTER_PARTIAL, + t_old_conf_exporter_partial1 => ?OLD_CONF_EXPORTER_PARTIAL1 +}). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(TC, Config) -> + Apps = start_apps(TC, Config, maps:get(TC, ?TESTS_CONF)), + [{suite_apps, Apps} | Config]. + +end_per_testcase(_TC, Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +start_apps(TC, Config, OtelConf) -> + emqx_cth_suite:start( + [ + {emqx_conf, OtelConf}, + emqx_management, + emqx_opentelemetry + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ). + +t_old_conf_enabled(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{metrics := #{enable := true, interval := _}, exporter := #{endpoint := _}}, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_disabled(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{metrics := #{enable := false, interval := _}, exporter := #{endpoint := _}}, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_enabled_exporter(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := true, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_disabled_exporter(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter(_Config) -> + io:format(user, "TC running: ~p~n", [?FUNCTION_NAME]), + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 5000}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter_partial(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := _}, + exporter := #{endpoint := <<"http://127.0.0.1:4317/">>} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). + +t_old_conf_exporter_partial1(_Config) -> + OtelConf = emqx:get_config([opentelemetry]), + ?assertMatch( + #{ + metrics := #{enable := false, interval := 3000}, + exporter := #{endpoint := _} + }, + OtelConf + ), + ?assertNot(erlang:is_map_key(enable, OtelConf)), + ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))). diff --git a/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl b/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl new file mode 100644 index 000000000..88917d7e3 --- /dev/null +++ b/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl @@ -0,0 +1,431 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_otel_trace_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(OTEL_SERVICE_NAME, "emqx"). +-define(CONF_PATH, [opentelemetry]). + +%% How to run it locally: +%% 1. Uncomment networks in .ci/docker-compose-file/docker-compose-otel.yaml, +%% Uncomment OTLP gRPC ports mappings for otel-collector and otel-collector-tls services. +%% Uncomment jaeger-all-in-one prots maooing. +%% 2. Start deps services: +%% DOCKER_USER="$(id -u)" docker-compose -f .ci/docker-compose-file/docker-compose-otel.yaml up +%% 3. Run tests with special env variables: +%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \ +%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \ +%% make "apps/emqx_opentelemetry-ct" +%% Or run only this suite: +%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \ +%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \ +%% ./rebar3 ct -v --readable=true --name 'test@127.0.0.1' \ +%% --suite apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, TCs}, + {tls, TCs} + ]. + +init_per_suite(Config) -> + %% This is called by emqx_machine in EMQX release + emqx_otel_app:configure_otel_deps(), + %% No release name during the test case, we need a reliable service name to query Jaeger + os:putenv("OTEL_SERVICE_NAME", ?OTEL_SERVICE_NAME), + JaegerURL = os:getenv("JAEGER_URL", "http://jaeger.emqx.net:16686"), + [{jaeger_url, JaegerURL} | Config]. + +end_per_suite(_) -> + os:unsetenv("OTEL_SERVICE_NAME"), + ok. + +init_per_group(tcp = Group, Config) -> + OtelCollectorURL = os:getenv("OTEL_COLLECTOR_URL", "http://otel-collector.emqx.net:4317"), + [ + {otel_collector_url, OtelCollectorURL}, + {logs_exporter_file_path, logs_exporter_file_path(Group, Config)} + | Config + ]; +init_per_group(tls = Group, Config) -> + OtelCollectorURL = os:getenv( + "OTEL_COLLECTOR_TLS_URL", "https://otel-collector-tls.emqx.net:4317" + ), + [ + {otel_collector_url, OtelCollectorURL}, + {logs_exporter_file_path, logs_exporter_file_path(Group, Config)} + | Config + ]. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(t_distributed_trace = TC, Config) -> + Cluster = cluster(TC, Config), + [{cluster, Cluster} | Config]; +init_per_testcase(TC, Config) -> + Apps = emqx_cth_suite:start(apps_spec(), #{work_dir => emqx_cth_suite:work_dir(TC, Config)}), + [{suite_apps, Apps} | Config]. + +end_per_testcase(t_distributed_trace = _TC, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + emqx_config:delete_override_conf_files(), + ok; +end_per_testcase(_TC, Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +t_trace(Config) -> + MqttHostPort = mqtt_host_port(), + + {ok, _} = emqx_conf:update(?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}), + + Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + TopicNoSubs = <<"t/trace/test/nosub/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn1 = connect(MqttHostPort, <<"sub1">>), + {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic), + SubConn2 = connect(MqttHostPort, <<"sub2">>), + {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic), + PubConn = connect(MqttHostPort, <<"pub">>), + + TraceParent = traceparent(true), + TraceParentNotSampled = traceparent(false), + ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []), + ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []), + + TraceParentNoSub = traceparent(true), + TraceParentNoSubNotSampled = traceparent(false), + ok = emqtt:publish(PubConn, TopicNoSubs, props(TraceParentNoSub), <<"must be traced">>, []), + ok = emqtt:publish( + PubConn, TopicNoSubs, props(TraceParentNoSubNotSampled), <<"must not be traced">>, [] + ), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + [Trace] = filter_traces(trace_id(TraceParent), Traces), + [] = filter_traces(trace_id(TraceParentNotSampled), Traces), + [TraceNoSub] = filter_traces(trace_id(TraceParentNoSub), Traces), + [] = filter_traces(trace_id(TraceParentNoSubNotSampled), Traces), + + #{<<"spans">> := Spans, <<"processes">> := _} = Trace, + %% 2 sub spans and 1 publish process span + IsExpectedSpansLen = length(Spans) =:= 3, + + #{<<"spans">> := SpansNoSub, <<"processes">> := _} = TraceNoSub, + %% Only 1 publish process span + IsExpectedSpansLen andalso 1 =:= length(SpansNoSub) + end, + 10_000 + ) + ), + stop_conns([SubConn1, SubConn2, PubConn]). + +t_trace_disabled(_Config) -> + ?assertNot(emqx:get_config(?CONF_PATH ++ [traces, enable])), + %% Tracer must be actually disabled + ?assertEqual({otel_tracer_noop, []}, opentelemetry:get_tracer()), + ?assertEqual(undefined, emqx_external_trace:provider()), + + Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn = connect(mqtt_host_port(), <<"sub">>), + {ok, _, [0]} = emqtt:subscribe(SubConn, Topic), + PubConn = connect(mqtt_host_port(), <<"pub">>), + + TraceParent = traceparent(true), + emqtt:publish(PubConn, Topic, props(TraceParent), <<>>, []), + receive + {publish, #{topic := Topic, properties := Props}} -> + %% traceparent must be propagated by EMQX even if internal otel trace is disabled + #{'User-Property' := [{<<"traceparent">>, TrParent}]} = Props, + ?assertEqual(TraceParent, TrParent) + after 10_000 -> + ct:fail("published_message_not_received") + end, + + %% if otel trace is registered but is actually not running, EMQX must work fine + %% and the message must be delivered to the subscriber + ok = emqx_otel_trace:toggle_registered(true), + TraceParent1 = traceparent(true), + emqtt:publish(PubConn, Topic, props(TraceParent1), <<>>, []), + receive + {publish, #{topic := Topic, properties := Props1}} -> + #{'User-Property' := [{<<"traceparent">>, TrParent1}]} = Props1, + ?assertEqual(TraceParent1, TrParent1) + after 10_000 -> + ct:fail("published_message_not_received") + end, + stop_conns([SubConn, PubConn]). + +t_trace_all(Config) -> + OtelConf = enabled_trace_conf(Config), + OtelConf1 = emqx_utils_maps:deep_put([<<"traces">>, <<"filter">>], OtelConf, #{ + <<"trace_all">> => true + }), + {ok, _} = emqx_conf:update(?CONF_PATH, OtelConf1, #{override_to => cluster}), + + Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>, + ClientId = <<"pub-", (integer_to_binary(erlang:system_time(nanosecond)))/binary>>, + PubConn = connect(mqtt_host_port(), ClientId), + emqtt:publish(PubConn, Topic, #{}, <<>>, []), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + Res = lists:filter( + fun(#{<<"spans">> := Spans}) -> + case Spans of + %% Only one span is expected as there are no subscribers + [#{<<"tags">> := Tags}] -> + lists:any( + fun(#{<<"key">> := K, <<"value">> := Val}) -> + K =:= <<"messaging.client_id">> andalso Val =:= ClientId + end, + Tags + ); + _ -> + false + end + end, + Traces + ), + %% Expecting exactly 1 span + length(Res) =:= 1 + end, + 10_000 + ) + ), + stop_conns([PubConn]). + +t_distributed_trace(Config) -> + [Core1, Core2, Repl] = Cluster = ?config(cluster, Config), + {ok, _} = rpc:call( + Core1, + emqx_conf, + update, + [?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}] + ), + Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>, + + SubConn1 = connect(mqtt_host_port(Core1), <<"sub1">>), + {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic), + SubConn2 = connect(mqtt_host_port(Core2), <<"sub2">>), + {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic), + SubConn3 = connect(mqtt_host_port(Repl), <<"sub3">>), + {ok, _, [0]} = emqtt:subscribe(SubConn3, Topic), + + PubConn = connect(mqtt_host_port(Repl), <<"pub">>), + + TraceParent = traceparent(true), + TraceParentNotSampled = traceparent(false), + + ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []), + ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)), + [Trace] = filter_traces(trace_id(TraceParent), Traces), + + [] = filter_traces(trace_id(TraceParentNotSampled), Traces), + + #{<<"spans">> := Spans, <<"processes">> := Procs} = Trace, + + %% 3 sub spans and 1 publish process span + 4 = length(Spans), + [_, _, _] = SendSpans = filter_spans(<<"send_published_message">>, Spans), + + IsAllNodesSpans = + lists:sort([atom_to_binary(N) || N <- Cluster]) =:= + lists:sort([span_node(S, Procs) || S <- SendSpans]), + + [PubSpan] = filter_spans(<<"process_message">>, Spans), + atom_to_binary(Repl) =:= span_node(PubSpan, Procs) andalso IsAllNodesSpans + end, + 10_000 + ) + ), + stop_conns([SubConn1, SubConn2, SubConn3, PubConn]). + +%% Keeping this test in this SUITE as there is no separate module for logs +t_log(Config) -> + Level = emqx_logger:get_primary_log_level(), + LogsConf = #{ + <<"logs">> => #{ + <<"enable">> => true, + <<"level">> => atom_to_binary(Level), + <<"scheduled_delay">> => <<"20ms">> + }, + <<"exporter">> => exporter_conf(Config) + }, + {ok, _} = emqx_conf:update(?CONF_PATH, LogsConf, #{override_to => cluster}), + + %% Ids are only needed for matching logs in the file exported by otel-collector + Id = integer_to_binary(otel_id_generator:generate_trace_id()), + ?SLOG(Level, #{msg => "otel_test_log_message", id => Id}), + Id1 = integer_to_binary(otel_id_generator:generate_trace_id()), + logger:Level("Ordinary log message, id: ~p", [Id1]), + + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + {ok, Logs} = file:read_file(?config(logs_exporter_file_path, Config)), + binary:match(Logs, Id) =/= nomatch andalso binary:match(Logs, Id1) =/= nomatch + end, + 10_000 + ) + ). + +logs_exporter_file_path(Group, Config) -> + filename:join([project_dir(Config), logs_exporter_filename(Group)]). + +project_dir(Config) -> + filename:join( + lists:takewhile( + fun(PathPart) -> PathPart =/= "_build" end, + filename:split(?config(priv_dir, Config)) + ) + ). + +logs_exporter_filename(tcp) -> + ".ci/docker-compose-file/otel/otel-collector.json"; +logs_exporter_filename(tls) -> + ".ci/docker-compose-file/otel/otel-collector-tls.json". + +enabled_trace_conf(TcConfig) -> + #{ + <<"traces">> => #{ + <<"enable">> => true, + <<"scheduled_delay">> => <<"50ms">> + }, + <<"exporter">> => exporter_conf(TcConfig) + }. + +exporter_conf(TcConfig) -> + #{<<"endpoint">> => ?config(otel_collector_url, TcConfig)}. + +span_node(#{<<"processID">> := ProcId}, Procs) -> + #{ProcId := #{<<"tags">> := ProcTags}} = Procs, + [#{<<"value">> := Node}] = lists:filter( + fun(#{<<"key">> := K}) -> + K =:= <<"service.instance.id">> + end, + ProcTags + ), + Node. + +trace_id(<<"00-", TraceId:32/binary, _/binary>>) -> + TraceId. + +filter_traces(TraceId, Traces) -> + lists:filter(fun(#{<<"traceID">> := TrId}) -> TrId =:= TraceId end, Traces). + +filter_spans(OpName, Spans) -> + lists:filter(fun(#{<<"operationName">> := Name}) -> Name =:= OpName end, Spans). + +get_jaeger_traces(JagerBaseURL) -> + case httpc:request(JagerBaseURL ++ "/api/traces?service=" ++ ?OTEL_SERVICE_NAME) of + {ok, {{_, 200, _}, _, RespBpdy}} -> + {ok, emqx_utils_json:decode(RespBpdy)}; + Err -> + ct:pal("Jager error: ~p", Err), + Err + end. + +stop_conns(Conns) -> + lists:foreach(fun emqtt:stop/1, Conns). + +props(TraceParent) -> + #{'User-Property' => [{<<"traceparent">>, TraceParent}]}. + +traceparent(IsSampled) -> + TraceId = otel_id_generator:generate_trace_id(), + SpanId = otel_id_generator:generate_span_id(), + {ok, TraceIdHexStr} = otel_utils:format_binary_string("~32.16.0b", [TraceId]), + {ok, SpanIdHexStr} = otel_utils:format_binary_string("~16.16.0b", [SpanId]), + TraceFlags = + case IsSampled of + true -> <<"01">>; + false -> <<"00">> + end, + <<"00-", TraceIdHexStr/binary, "-", SpanIdHexStr/binary, "-", TraceFlags/binary>>. + +connect({Host, Port}, ClientId) -> + {ok, ConnPid} = emqtt:start_link([ + {proto_ver, v5}, + {host, Host}, + {port, Port}, + {clientid, ClientId} + ]), + {ok, _} = emqtt:connect(ConnPid), + ConnPid. + +mqtt_host_port() -> + emqx:get_config([listeners, tcp, default, bind]). + +mqtt_host_port(Node) -> + rpc:call(Node, emqx, get_config, [[listeners, tcp, default, bind]]). + +cluster(TC, Config) -> + Nodes = emqx_cth_cluster:start( + [ + {otel_trace_core1, #{role => core, apps => apps_spec()}}, + {otel_trace_core2, #{role => core, apps => apps_spec()}}, + {otel_trace_replicant, #{role => replicant, apps => apps_spec()}} + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ), + Nodes. + +apps_spec() -> + [ + emqx, + emqx_conf, + emqx_management, + emqx_opentelemetry + ]. diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 4de3b4d7c..7959581a9 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -243,6 +243,9 @@ for dep in ${CT_DEPS}; do ldap) FILES+=( '.ci/docker-compose-file/docker-compose-ldap.yaml' ) ;; + otel) + FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1