test(emqx_opentelemetry): add trace test suite

This commit is contained in:
Serge Tupchii 2023-11-29 18:27:44 +02:00
parent c8e69357cc
commit 85441fda0f
11 changed files with 1078 additions and 13 deletions

View File

@ -0,0 +1,69 @@
version: '3.9'
services:
jaeger-all-in-one:
image: jaegertracing/all-in-one:1.51.0
container_name: jaeger.emqx.net
hostname: jaeger.emqx.net
networks:
- emqx_bridge
restart: always
# ports:
# - "16686:16686"
user: "${DOCKER_USER:-root}"
# Collector
otel-collector:
image: otel/opentelemetry-collector:0.90.0
container_name: otel-collector.emqx.net
hostname: otel-collector.emqx.net
networks:
- emqx_bridge
restart: always
command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel:/etc/
# ports:
# - "1888:1888" # pprof extension
# - "8888:8888" # Prometheus metrics exposed by the collector
# - "8889:8889" # Prometheus exporter metrics
# - "13133:13133" # health_check extension
# - "4317:4317" # OTLP gRPC receiver
# - "4318:4318" # OTLP http receiver
# - "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
user: "${DOCKER_USER:-root}"
# Collector
otel-collector-tls:
image: otel/opentelemetry-collector:0.90.0
container_name: otel-collector-tls.emqx.net
hostname: otel-collector-tls.emqx.net
networks:
- emqx_bridge
restart: always
command: ["--config=/etc/otel-collector-config-tls.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel:/etc/
- ./certs:/etc/certs
# ports:
# - "14317:4317" # OTLP gRPC receiver
depends_on:
- jaeger-all-in-one
user: "${DOCKER_USER:-root}"
#networks:
# emqx_bridge:
# driver: bridge
# name: emqx_bridge
# enable_ipv6: true
# ipam:
# driver: default
# config:
# - subnet: 172.100.239.0/24
# gateway: 172.100.239.1
# - subnet: 2001:3200:3200::/64
# gateway: 2001:3200:3200::1
#

View File

@ -0,0 +1,6 @@
certs
hostname
hosts
otel-collector.json
otel-collector-tls.json
resolv.conf

View File

@ -0,0 +1,52 @@
receivers:
otlp:
protocols:
grpc:
tls:
ca_file: /etc/certs/ca.crt
cert_file: /etc/certs/server.crt
key_file: /etc/certs/server.key
http:
tls:
ca_file: /etc/certs/ca.crt
cert_file: /etc/certs/server.crt
key_file: /etc/certs/server.key
exporters:
logging:
verbosity: detailed
otlp:
endpoint: jaeger.emqx.net:4317
tls:
insecure: true
debug:
verbosity: detailed
file:
path: /etc/otel-collector-tls.json
processors:
batch:
# send data immediately
timeout: 0
extensions:
health_check:
zpages:
endpoint: :55679
service:
extensions: [zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging, file]

View File

@ -0,0 +1,51 @@
receivers:
otlp:
protocols:
grpc:
tls:
# ca_file: /etc/ca.pem
# cert_file: /etc/server.pem
# key_file: /etc/server.key
http:
tls:
# ca_file: /etc/ca.pem
# cert_file: /etc/server.pem
# key_file: /etc/server.key
exporters:
logging:
verbosity: detailed
otlp:
endpoint: jaeger.emqx.net:4317
tls:
insecure: true
debug:
verbosity: detailed
file:
path: /etc/otel-collector.json
processors:
batch:
# send data immediately
timeout: 0
extensions:
health_check:
zpages:
endpoint: :55679
service:
extensions: [zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging, file]

View File

@ -28,6 +28,7 @@
-callback event(EventName :: term(), Attributes :: term()) -> ok.
-export([
provider/0,
register_provider/1,
unregister_provider/1,
trace_process_publish/3,
@ -71,6 +72,9 @@ unregister_provider(Module) ->
{error, not_registered}
end.
-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).
%%--------------------------------------------------------------------
%% trace API
%%--------------------------------------------------------------------

View File

@ -0,0 +1 @@
otel

View File

@ -103,24 +103,19 @@ otel_config_schema() ->
otel_config_example() ->
#{
exporter => #{
endpoint => "http://localhost:4317",
ssl_options => #{}
},
logs => #{
enable => true,
exporter => #{
endpoint => "http://localhost:4317",
ssl_options => #{
enable => false
}
},
level => warning
},
metrics => #{
enable => true
},
traces => #{
enable => true,
exporter => #{
endpoint => "http://localhost:4317",
interval => "10s",
ssl_options => #{
enable => false
}
}
filter => #{trace_all => false}
}
}.

View File

@ -0,0 +1,252 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(OTEL_API_PATH, emqx_mgmt_api_test_util:api_path(["opentelemetry"])).
-define(CONF_PATH, [opentelemetry]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
%% This is called by emqx_machine in EMQX release
emqx_otel_app:configure_otel_deps(),
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
emqx_opentelemetry
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
Auth = auth_header(),
[{suite_apps, Apps}, {auth, Auth} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files(),
ok.
init_per_testcase(_TC, Config) ->
emqx_conf:update(
?CONF_PATH,
#{
<<"traces">> => #{<<"enable">> => false},
<<"metrics">> => #{<<"enable">> => false},
<<"logs">> => #{<<"enable">> => false}
},
#{}
),
Config.
end_per_testcase(_TC, _Config) ->
ok.
auth_header() ->
{ok, API} = emqx_common_test_http:create_default_app(),
emqx_common_test_http:auth_header(API).
t_get(Config) ->
Auth = ?config(auth, Config),
Path = ?OTEL_API_PATH,
{ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
?assertMatch(
#{
<<"traces">> := #{<<"enable">> := false},
<<"metrics">> := #{<<"enable">> := false},
<<"logs">> := #{<<"enable">> := false}
},
emqx_utils_json:decode(Resp)
).
t_put_enable_disable(Config) ->
Auth = ?config(auth, Config),
Path = ?OTEL_API_PATH,
EnableAllReq = #{
<<"traces">> => #{<<"enable">> => true},
<<"metrics">> => #{<<"enable">> => true},
<<"logs">> => #{<<"enable">> => true}
},
?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, EnableAllReq)),
?assertMatch(
#{
traces := #{enable := true},
metrics := #{enable := true},
logs := #{enable := true}
},
emqx:get_config(?CONF_PATH)
),
DisableAllReq = #{
<<"traces">> => #{<<"enable">> => false},
<<"metrics">> => #{<<"enable">> => false},
<<"logs">> => #{<<"enable">> => false}
},
?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, DisableAllReq)),
?assertMatch(
#{
traces := #{enable := false},
metrics := #{enable := false},
logs := #{enable := false}
},
emqx:get_config(?CONF_PATH)
).
t_put_invalid(Config) ->
Auth = ?config(auth, Config),
Path = ?OTEL_API_PATH,
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"endpoint">> => <<>>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"endpoint">> => <<"unknown://somehost.org">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"unknown_field">> => <<"foo">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"protocol">> => <<"unknown">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"traces">> => #{<<"filter">> => #{<<"unknown_filter">> => <<"foo">>}}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"logs">> => #{<<"level">> => <<"foo">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"metrics">> => #{<<"interval">> => <<"foo">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"logs">> => #{<<"unknown_field">> => <<"foo">>}
})
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"unknown_field">> => <<"foo">>})
).
t_put_valid(Config) ->
Auth = ?config(auth, Config),
Path = ?OTEL_API_PATH,
?assertMatch(
{ok, _},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
<<"exporter">> => #{<<"endpoint">> => <<"nohost.com">>}
})
),
?assertEqual(<<"http://nohost.com/">>, emqx:get_config(?CONF_PATH ++ [exporter, endpoint])),
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"exporter">> => #{}})
),
?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{})),
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"traces">> => #{}})
),
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"logs">> => #{}})
),
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"metrics">> => #{}})
),
?assertMatch(
{ok, _},
emqx_mgmt_api_test_util:request_api(
put,
Path,
"",
Auth,
#{<<"exporter">> => #{}, <<"traces">> => #{}, <<"logs">> => #{}, <<"metrics">> => #{}}
)
),
?assertMatch(
{ok, _},
emqx_mgmt_api_test_util:request_api(
put,
Path,
"",
Auth,
#{
<<"exporter">> => #{
<<"endpoint">> => <<"https://localhost:4317">>, <<"protocol">> => <<"grpc">>
},
<<"traces">> => #{
<<"enable">> => true,
<<"max_queue_size">> => 10,
<<"exporting_timeout">> => <<"10s">>,
<<"scheduled_delay">> => <<"20s">>,
<<"filter">> => #{<<"trace_all">> => true}
},
<<"logs">> => #{
<<"level">> => <<"warning">>,
<<"max_queue_size">> => 100,
<<"exporting_timeout">> => <<"10s">>,
<<"scheduled_delay">> => <<"1s">>
},
<<"metrics">> => #{
%% alias for "interval"
<<"scheduled_delay">> => <<"15321ms">>
}
}
),
%% alias check
?assertEqual(15_321, emqx:get_config(?CONF_PATH ++ [metrics, interval]))
).

View File

@ -0,0 +1,201 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_schema_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Backward compatibility suite for `upgrade_raw_conf/1`,
%% expected callback is `emqx_otel_schema:upgrade_legacy_metrics/1`
-define(OLD_CONF_ENABLED, <<
"\n"
"opentelemetry\n"
"{\n"
" enable = true\n"
"}\n"
>>).
-define(OLD_CONF_DISABLED, <<
"\n"
"opentelemetry\n"
"{\n"
" enable = false\n"
"}\n"
>>).
-define(OLD_CONF_ENABLED_EXPORTER, <<
"\n"
"opentelemetry\n"
"{\n"
" enable = true\n"
" exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
"}\n"
>>).
-define(OLD_CONF_DISABLED_EXPORTER, <<
"\n"
"opentelemetry\n"
"{\n"
" enable = false\n"
" exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
"}\n"
>>).
-define(OLD_CONF_EXPORTER, <<
"\n"
"opentelemetry\n"
"{\n"
" exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
"}\n"
>>).
-define(OLD_CONF_EXPORTER_PARTIAL, <<
"\n"
"opentelemetry\n"
"{\n"
" exporter {endpoint = \"http://127.0.0.1:4317/\"}\n"
"}\n"
>>).
-define(OLD_CONF_EXPORTER_PARTIAL1, <<
"\n"
"opentelemetry\n"
"{\n"
" exporter {interval = 3s}\n"
"}\n"
>>).
-define(TESTS_CONF, #{
t_old_conf_enabled => ?OLD_CONF_ENABLED,
t_old_conf_disabled => ?OLD_CONF_DISABLED,
t_old_conf_enabled_exporter => ?OLD_CONF_ENABLED_EXPORTER,
t_old_conf_disabled_exporter => ?OLD_CONF_DISABLED_EXPORTER,
t_old_conf_exporter => ?OLD_CONF_EXPORTER,
t_old_conf_exporter_partial => ?OLD_CONF_EXPORTER_PARTIAL,
t_old_conf_exporter_partial1 => ?OLD_CONF_EXPORTER_PARTIAL1
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(TC, Config) ->
Apps = start_apps(TC, Config, maps:get(TC, ?TESTS_CONF)),
[{suite_apps, Apps} | Config].
end_per_testcase(_TC, Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files(),
ok.
start_apps(TC, Config, OtelConf) ->
emqx_cth_suite:start(
[
{emqx_conf, OtelConf},
emqx_management,
emqx_opentelemetry
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
).
t_old_conf_enabled(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{metrics := #{enable := true, interval := _}, exporter := #{endpoint := _}},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_disabled(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{metrics := #{enable := false, interval := _}, exporter := #{endpoint := _}},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_enabled_exporter(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{
metrics := #{enable := true, interval := 5000},
exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_disabled_exporter(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{
metrics := #{enable := false, interval := 5000},
exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_exporter(_Config) ->
io:format(user, "TC running: ~p~n", [?FUNCTION_NAME]),
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{
metrics := #{enable := false, interval := 5000},
exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_exporter_partial(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{
metrics := #{enable := false, interval := _},
exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
t_old_conf_exporter_partial1(_Config) ->
OtelConf = emqx:get_config([opentelemetry]),
?assertMatch(
#{
metrics := #{enable := false, interval := 3000},
exporter := #{endpoint := _}
},
OtelConf
),
?assertNot(erlang:is_map_key(enable, OtelConf)),
?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).

View File

@ -0,0 +1,431 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_trace_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/logger.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(OTEL_SERVICE_NAME, "emqx").
-define(CONF_PATH, [opentelemetry]).
%% How to run it locally:
%% 1. Uncomment networks in .ci/docker-compose-file/docker-compose-otel.yaml,
%% Uncomment OTLP gRPC ports mappings for otel-collector and otel-collector-tls services.
%% Uncomment jaeger-all-in-one prots maooing.
%% 2. Start deps services:
%% DOCKER_USER="$(id -u)" docker-compose -f .ci/docker-compose-file/docker-compose-otel.yaml up
%% 3. Run tests with special env variables:
%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \
%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
%% make "apps/emqx_opentelemetry-ct"
%% Or run only this suite:
%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \
%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
%% ./rebar3 ct -v --readable=true --name 'test@127.0.0.1' \
%% --suite apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl
all() ->
[
{group, tcp},
{group, tls}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, TCs},
{tls, TCs}
].
init_per_suite(Config) ->
%% This is called by emqx_machine in EMQX release
emqx_otel_app:configure_otel_deps(),
%% No release name during the test case, we need a reliable service name to query Jaeger
os:putenv("OTEL_SERVICE_NAME", ?OTEL_SERVICE_NAME),
JaegerURL = os:getenv("JAEGER_URL", "http://jaeger.emqx.net:16686"),
[{jaeger_url, JaegerURL} | Config].
end_per_suite(_) ->
os:unsetenv("OTEL_SERVICE_NAME"),
ok.
init_per_group(tcp = Group, Config) ->
OtelCollectorURL = os:getenv("OTEL_COLLECTOR_URL", "http://otel-collector.emqx.net:4317"),
[
{otel_collector_url, OtelCollectorURL},
{logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
| Config
];
init_per_group(tls = Group, Config) ->
OtelCollectorURL = os:getenv(
"OTEL_COLLECTOR_TLS_URL", "https://otel-collector-tls.emqx.net:4317"
),
[
{otel_collector_url, OtelCollectorURL},
{logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
| Config
].
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_distributed_trace = TC, Config) ->
Cluster = cluster(TC, Config),
[{cluster, Cluster} | Config];
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(apps_spec(), #{work_dir => emqx_cth_suite:work_dir(TC, Config)}),
[{suite_apps, Apps} | Config].
end_per_testcase(t_distributed_trace = _TC, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)),
emqx_config:delete_override_conf_files(),
ok;
end_per_testcase(_TC, Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files(),
ok.
t_trace(Config) ->
MqttHostPort = mqtt_host_port(),
{ok, _} = emqx_conf:update(?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}),
Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
TopicNoSubs = <<"t/trace/test/nosub/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn1 = connect(MqttHostPort, <<"sub1">>),
{ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
SubConn2 = connect(MqttHostPort, <<"sub2">>),
{ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
PubConn = connect(MqttHostPort, <<"pub">>),
TraceParent = traceparent(true),
TraceParentNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
TraceParentNoSub = traceparent(true),
TraceParentNoSubNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, TopicNoSubs, props(TraceParentNoSub), <<"must be traced">>, []),
ok = emqtt:publish(
PubConn, TopicNoSubs, props(TraceParentNoSubNotSampled), <<"must not be traced">>, []
),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
[Trace] = filter_traces(trace_id(TraceParent), Traces),
[] = filter_traces(trace_id(TraceParentNotSampled), Traces),
[TraceNoSub] = filter_traces(trace_id(TraceParentNoSub), Traces),
[] = filter_traces(trace_id(TraceParentNoSubNotSampled), Traces),
#{<<"spans">> := Spans, <<"processes">> := _} = Trace,
%% 2 sub spans and 1 publish process span
IsExpectedSpansLen = length(Spans) =:= 3,
#{<<"spans">> := SpansNoSub, <<"processes">> := _} = TraceNoSub,
%% Only 1 publish process span
IsExpectedSpansLen andalso 1 =:= length(SpansNoSub)
end,
10_000
)
),
stop_conns([SubConn1, SubConn2, PubConn]).
t_trace_disabled(_Config) ->
?assertNot(emqx:get_config(?CONF_PATH ++ [traces, enable])),
%% Tracer must be actually disabled
?assertEqual({otel_tracer_noop, []}, opentelemetry:get_tracer()),
?assertEqual(undefined, emqx_external_trace:provider()),
Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn = connect(mqtt_host_port(), <<"sub">>),
{ok, _, [0]} = emqtt:subscribe(SubConn, Topic),
PubConn = connect(mqtt_host_port(), <<"pub">>),
TraceParent = traceparent(true),
emqtt:publish(PubConn, Topic, props(TraceParent), <<>>, []),
receive
{publish, #{topic := Topic, properties := Props}} ->
%% traceparent must be propagated by EMQX even if internal otel trace is disabled
#{'User-Property' := [{<<"traceparent">>, TrParent}]} = Props,
?assertEqual(TraceParent, TrParent)
after 10_000 ->
ct:fail("published_message_not_received")
end,
%% if otel trace is registered but is actually not running, EMQX must work fine
%% and the message must be delivered to the subscriber
ok = emqx_otel_trace:toggle_registered(true),
TraceParent1 = traceparent(true),
emqtt:publish(PubConn, Topic, props(TraceParent1), <<>>, []),
receive
{publish, #{topic := Topic, properties := Props1}} ->
#{'User-Property' := [{<<"traceparent">>, TrParent1}]} = Props1,
?assertEqual(TraceParent1, TrParent1)
after 10_000 ->
ct:fail("published_message_not_received")
end,
stop_conns([SubConn, PubConn]).
t_trace_all(Config) ->
OtelConf = enabled_trace_conf(Config),
OtelConf1 = emqx_utils_maps:deep_put([<<"traces">>, <<"filter">>], OtelConf, #{
<<"trace_all">> => true
}),
{ok, _} = emqx_conf:update(?CONF_PATH, OtelConf1, #{override_to => cluster}),
Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
ClientId = <<"pub-", (integer_to_binary(erlang:system_time(nanosecond)))/binary>>,
PubConn = connect(mqtt_host_port(), ClientId),
emqtt:publish(PubConn, Topic, #{}, <<>>, []),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
Res = lists:filter(
fun(#{<<"spans">> := Spans}) ->
case Spans of
%% Only one span is expected as there are no subscribers
[#{<<"tags">> := Tags}] ->
lists:any(
fun(#{<<"key">> := K, <<"value">> := Val}) ->
K =:= <<"messaging.client_id">> andalso Val =:= ClientId
end,
Tags
);
_ ->
false
end
end,
Traces
),
%% Expecting exactly 1 span
length(Res) =:= 1
end,
10_000
)
),
stop_conns([PubConn]).
t_distributed_trace(Config) ->
[Core1, Core2, Repl] = Cluster = ?config(cluster, Config),
{ok, _} = rpc:call(
Core1,
emqx_conf,
update,
[?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}]
),
Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn1 = connect(mqtt_host_port(Core1), <<"sub1">>),
{ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
SubConn2 = connect(mqtt_host_port(Core2), <<"sub2">>),
{ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
SubConn3 = connect(mqtt_host_port(Repl), <<"sub3">>),
{ok, _, [0]} = emqtt:subscribe(SubConn3, Topic),
PubConn = connect(mqtt_host_port(Repl), <<"pub">>),
TraceParent = traceparent(true),
TraceParentNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
[Trace] = filter_traces(trace_id(TraceParent), Traces),
[] = filter_traces(trace_id(TraceParentNotSampled), Traces),
#{<<"spans">> := Spans, <<"processes">> := Procs} = Trace,
%% 3 sub spans and 1 publish process span
4 = length(Spans),
[_, _, _] = SendSpans = filter_spans(<<"send_published_message">>, Spans),
IsAllNodesSpans =
lists:sort([atom_to_binary(N) || N <- Cluster]) =:=
lists:sort([span_node(S, Procs) || S <- SendSpans]),
[PubSpan] = filter_spans(<<"process_message">>, Spans),
atom_to_binary(Repl) =:= span_node(PubSpan, Procs) andalso IsAllNodesSpans
end,
10_000
)
),
stop_conns([SubConn1, SubConn2, SubConn3, PubConn]).
%% Keeping this test in this SUITE as there is no separate module for logs
t_log(Config) ->
Level = emqx_logger:get_primary_log_level(),
LogsConf = #{
<<"logs">> => #{
<<"enable">> => true,
<<"level">> => atom_to_binary(Level),
<<"scheduled_delay">> => <<"20ms">>
},
<<"exporter">> => exporter_conf(Config)
},
{ok, _} = emqx_conf:update(?CONF_PATH, LogsConf, #{override_to => cluster}),
%% Ids are only needed for matching logs in the file exported by otel-collector
Id = integer_to_binary(otel_id_generator:generate_trace_id()),
?SLOG(Level, #{msg => "otel_test_log_message", id => Id}),
Id1 = integer_to_binary(otel_id_generator:generate_trace_id()),
logger:Level("Ordinary log message, id: ~p", [Id1]),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, Logs} = file:read_file(?config(logs_exporter_file_path, Config)),
binary:match(Logs, Id) =/= nomatch andalso binary:match(Logs, Id1) =/= nomatch
end,
10_000
)
).
logs_exporter_file_path(Group, Config) ->
filename:join([project_dir(Config), logs_exporter_filename(Group)]).
project_dir(Config) ->
filename:join(
lists:takewhile(
fun(PathPart) -> PathPart =/= "_build" end,
filename:split(?config(priv_dir, Config))
)
).
logs_exporter_filename(tcp) ->
".ci/docker-compose-file/otel/otel-collector.json";
logs_exporter_filename(tls) ->
".ci/docker-compose-file/otel/otel-collector-tls.json".
enabled_trace_conf(TcConfig) ->
#{
<<"traces">> => #{
<<"enable">> => true,
<<"scheduled_delay">> => <<"50ms">>
},
<<"exporter">> => exporter_conf(TcConfig)
}.
exporter_conf(TcConfig) ->
#{<<"endpoint">> => ?config(otel_collector_url, TcConfig)}.
span_node(#{<<"processID">> := ProcId}, Procs) ->
#{ProcId := #{<<"tags">> := ProcTags}} = Procs,
[#{<<"value">> := Node}] = lists:filter(
fun(#{<<"key">> := K}) ->
K =:= <<"service.instance.id">>
end,
ProcTags
),
Node.
trace_id(<<"00-", TraceId:32/binary, _/binary>>) ->
TraceId.
filter_traces(TraceId, Traces) ->
lists:filter(fun(#{<<"traceID">> := TrId}) -> TrId =:= TraceId end, Traces).
filter_spans(OpName, Spans) ->
lists:filter(fun(#{<<"operationName">> := Name}) -> Name =:= OpName end, Spans).
get_jaeger_traces(JagerBaseURL) ->
case httpc:request(JagerBaseURL ++ "/api/traces?service=" ++ ?OTEL_SERVICE_NAME) of
{ok, {{_, 200, _}, _, RespBpdy}} ->
{ok, emqx_utils_json:decode(RespBpdy)};
Err ->
ct:pal("Jager error: ~p", Err),
Err
end.
stop_conns(Conns) ->
lists:foreach(fun emqtt:stop/1, Conns).
props(TraceParent) ->
#{'User-Property' => [{<<"traceparent">>, TraceParent}]}.
traceparent(IsSampled) ->
TraceId = otel_id_generator:generate_trace_id(),
SpanId = otel_id_generator:generate_span_id(),
{ok, TraceIdHexStr} = otel_utils:format_binary_string("~32.16.0b", [TraceId]),
{ok, SpanIdHexStr} = otel_utils:format_binary_string("~16.16.0b", [SpanId]),
TraceFlags =
case IsSampled of
true -> <<"01">>;
false -> <<"00">>
end,
<<"00-", TraceIdHexStr/binary, "-", SpanIdHexStr/binary, "-", TraceFlags/binary>>.
connect({Host, Port}, ClientId) ->
{ok, ConnPid} = emqtt:start_link([
{proto_ver, v5},
{host, Host},
{port, Port},
{clientid, ClientId}
]),
{ok, _} = emqtt:connect(ConnPid),
ConnPid.
mqtt_host_port() ->
emqx:get_config([listeners, tcp, default, bind]).
mqtt_host_port(Node) ->
rpc:call(Node, emqx, get_config, [[listeners, tcp, default, bind]]).
cluster(TC, Config) ->
Nodes = emqx_cth_cluster:start(
[
{otel_trace_core1, #{role => core, apps => apps_spec()}},
{otel_trace_core2, #{role => core, apps => apps_spec()}},
{otel_trace_replicant, #{role => replicant, apps => apps_spec()}}
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
Nodes.
apps_spec() ->
[
emqx,
emqx_conf,
emqx_management,
emqx_opentelemetry
].

View File

@ -243,6 +243,9 @@ for dep in ${CT_DEPS}; do
ldap)
FILES+=( '.ci/docker-compose-file/docker-compose-ldap.yaml' )
;;
otel)
FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1