emqx/apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl

432 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_trace_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/logger.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(OTEL_SERVICE_NAME, "emqx").
-define(CONF_PATH, [opentelemetry]).
%% How to run it locally:
%% 1. Uncomment networks in .ci/docker-compose-file/docker-compose-otel.yaml,
%% Uncomment OTLP gRPC ports mappings for otel-collector and otel-collector-tls services.
%% Uncomment jaeger-all-in-one ports mapping.
%% 2. Start deps services:
%% DOCKER_USER="$(id -u)" docker-compose -f .ci/docker-compose-file/docker-compose-otel.yaml up
%% 3. Run tests with special env variables:
%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \
%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
%% make "apps/emqx_opentelemetry-ct"
%% Or run only this suite:
%% PROFILE=emqx JAEGER_URL="http://localhost:16686" \
%% OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
%% ./rebar3 ct -v --readable=true --name 'test@127.0.0.1' \
%% --suite apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl
all() ->
[
{group, tcp},
{group, tls}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, TCs},
{tls, TCs}
].
init_per_suite(Config) ->
%% This is called by emqx_machine in EMQX release
emqx_otel_app:configure_otel_deps(),
%% No release name during the test case, we need a reliable service name to query Jaeger
os:putenv("OTEL_SERVICE_NAME", ?OTEL_SERVICE_NAME),
JaegerURL = os:getenv("JAEGER_URL", "http://jaeger.emqx.net:16686"),
[{jaeger_url, JaegerURL} | Config].
end_per_suite(_) ->
os:unsetenv("OTEL_SERVICE_NAME"),
ok.
init_per_group(tcp = Group, Config) ->
OtelCollectorURL = os:getenv("OTEL_COLLECTOR_URL", "http://otel-collector.emqx.net:4317"),
[
{otel_collector_url, OtelCollectorURL},
{logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
| Config
];
init_per_group(tls = Group, Config) ->
OtelCollectorURL = os:getenv(
"OTEL_COLLECTOR_TLS_URL", "https://otel-collector-tls.emqx.net:4317"
),
[
{otel_collector_url, OtelCollectorURL},
{logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
| Config
].
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_distributed_trace = TC, Config) ->
Cluster = cluster(TC, Config),
[{cluster, Cluster} | Config];
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(apps_spec(), #{work_dir => emqx_cth_suite:work_dir(TC, Config)}),
[{suite_apps, Apps} | Config].
end_per_testcase(t_distributed_trace = _TC, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)),
emqx_config:delete_override_conf_files(),
ok;
end_per_testcase(_TC, Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files(),
ok.
t_trace(Config) ->
MqttHostPort = mqtt_host_port(),
{ok, _} = emqx_conf:update(?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}),
Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
TopicNoSubs = <<"t/trace/test/nosub/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn1 = connect(MqttHostPort, <<"sub1">>),
{ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
SubConn2 = connect(MqttHostPort, <<"sub2">>),
{ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
PubConn = connect(MqttHostPort, <<"pub">>),
TraceParent = traceparent(true),
TraceParentNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
TraceParentNoSub = traceparent(true),
TraceParentNoSubNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, TopicNoSubs, props(TraceParentNoSub), <<"must be traced">>, []),
ok = emqtt:publish(
PubConn, TopicNoSubs, props(TraceParentNoSubNotSampled), <<"must not be traced">>, []
),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
[Trace] = filter_traces(trace_id(TraceParent), Traces),
[] = filter_traces(trace_id(TraceParentNotSampled), Traces),
[TraceNoSub] = filter_traces(trace_id(TraceParentNoSub), Traces),
[] = filter_traces(trace_id(TraceParentNoSubNotSampled), Traces),
#{<<"spans">> := Spans, <<"processes">> := _} = Trace,
%% 2 sub spans and 1 publish process span
IsExpectedSpansLen = length(Spans) =:= 3,
#{<<"spans">> := SpansNoSub, <<"processes">> := _} = TraceNoSub,
%% Only 1 publish process span
IsExpectedSpansLen andalso 1 =:= length(SpansNoSub)
end,
10_000
)
),
stop_conns([SubConn1, SubConn2, PubConn]).
t_trace_disabled(_Config) ->
?assertNot(emqx:get_config(?CONF_PATH ++ [traces, enable])),
%% Tracer must be actually disabled
?assertEqual({otel_tracer_noop, []}, opentelemetry:get_tracer()),
?assertEqual(undefined, emqx_external_trace:provider()),
Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn = connect(mqtt_host_port(), <<"sub">>),
{ok, _, [0]} = emqtt:subscribe(SubConn, Topic),
PubConn = connect(mqtt_host_port(), <<"pub">>),
TraceParent = traceparent(true),
emqtt:publish(PubConn, Topic, props(TraceParent), <<>>, []),
receive
{publish, #{topic := Topic, properties := Props}} ->
%% traceparent must be propagated by EMQX even if internal otel trace is disabled
#{'User-Property' := [{<<"traceparent">>, TrParent}]} = Props,
?assertEqual(TraceParent, TrParent)
after 10_000 ->
ct:fail("published_message_not_received")
end,
%% if otel trace is registered but is actually not running, EMQX must work fine
%% and the message must be delivered to the subscriber
ok = emqx_otel_trace:toggle_registered(true),
TraceParent1 = traceparent(true),
emqtt:publish(PubConn, Topic, props(TraceParent1), <<>>, []),
receive
{publish, #{topic := Topic, properties := Props1}} ->
#{'User-Property' := [{<<"traceparent">>, TrParent1}]} = Props1,
?assertEqual(TraceParent1, TrParent1)
after 10_000 ->
ct:fail("published_message_not_received")
end,
stop_conns([SubConn, PubConn]).
t_trace_all(Config) ->
OtelConf = enabled_trace_conf(Config),
OtelConf1 = emqx_utils_maps:deep_put([<<"traces">>, <<"filter">>], OtelConf, #{
<<"trace_all">> => true
}),
{ok, _} = emqx_conf:update(?CONF_PATH, OtelConf1, #{override_to => cluster}),
Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
ClientId = <<"pub-", (integer_to_binary(erlang:system_time(nanosecond)))/binary>>,
PubConn = connect(mqtt_host_port(), ClientId),
emqtt:publish(PubConn, Topic, #{}, <<>>, []),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
Res = lists:filter(
fun(#{<<"spans">> := Spans}) ->
case Spans of
%% Only one span is expected as there are no subscribers
[#{<<"tags">> := Tags}] ->
lists:any(
fun(#{<<"key">> := K, <<"value">> := Val}) ->
K =:= <<"messaging.client_id">> andalso Val =:= ClientId
end,
Tags
);
_ ->
false
end
end,
Traces
),
%% Expecting exactly 1 span
length(Res) =:= 1
end,
10_000
)
),
stop_conns([PubConn]).
t_distributed_trace(Config) ->
[Core1, Core2, Repl] = Cluster = ?config(cluster, Config),
{ok, _} = rpc:call(
Core1,
emqx_conf,
update,
[?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}]
),
Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
SubConn1 = connect(mqtt_host_port(Core1), <<"sub1">>),
{ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
SubConn2 = connect(mqtt_host_port(Core2), <<"sub2">>),
{ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
SubConn3 = connect(mqtt_host_port(Repl), <<"sub3">>),
{ok, _, [0]} = emqtt:subscribe(SubConn3, Topic),
PubConn = connect(mqtt_host_port(Repl), <<"pub">>),
TraceParent = traceparent(true),
TraceParentNotSampled = traceparent(false),
ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
[Trace] = filter_traces(trace_id(TraceParent), Traces),
[] = filter_traces(trace_id(TraceParentNotSampled), Traces),
#{<<"spans">> := Spans, <<"processes">> := Procs} = Trace,
%% 3 sub spans and 1 publish process span
4 = length(Spans),
[_, _, _] = SendSpans = filter_spans(<<"send_published_message">>, Spans),
IsAllNodesSpans =
lists:sort([atom_to_binary(N) || N <- Cluster]) =:=
lists:sort([span_node(S, Procs) || S <- SendSpans]),
[PubSpan] = filter_spans(<<"process_message">>, Spans),
atom_to_binary(Repl) =:= span_node(PubSpan, Procs) andalso IsAllNodesSpans
end,
10_000
)
),
stop_conns([SubConn1, SubConn2, SubConn3, PubConn]).
%% Keeping this test in this SUITE as there is no separate module for logs
t_log(Config) ->
Level = emqx_logger:get_primary_log_level(),
LogsConf = #{
<<"logs">> => #{
<<"enable">> => true,
<<"level">> => atom_to_binary(Level),
<<"scheduled_delay">> => <<"20ms">>
},
<<"exporter">> => exporter_conf(Config)
},
{ok, _} = emqx_conf:update(?CONF_PATH, LogsConf, #{override_to => cluster}),
%% Ids are only needed for matching logs in the file exported by otel-collector
Id = integer_to_binary(otel_id_generator:generate_trace_id()),
?SLOG(Level, #{msg => "otel_test_log_message", id => Id}),
Id1 = integer_to_binary(otel_id_generator:generate_trace_id()),
logger:Level("Ordinary log message, id: ~p", [Id1]),
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
{ok, Logs} = file:read_file(?config(logs_exporter_file_path, Config)),
binary:match(Logs, Id) =/= nomatch andalso binary:match(Logs, Id1) =/= nomatch
end,
10_000
)
).
logs_exporter_file_path(Group, Config) ->
filename:join([project_dir(Config), logs_exporter_filename(Group)]).
project_dir(Config) ->
filename:join(
lists:takewhile(
fun(PathPart) -> PathPart =/= "_build" end,
filename:split(?config(priv_dir, Config))
)
).
logs_exporter_filename(tcp) ->
".ci/docker-compose-file/otel/otel-collector.json";
logs_exporter_filename(tls) ->
".ci/docker-compose-file/otel/otel-collector-tls.json".
enabled_trace_conf(TcConfig) ->
#{
<<"traces">> => #{
<<"enable">> => true,
<<"scheduled_delay">> => <<"50ms">>
},
<<"exporter">> => exporter_conf(TcConfig)
}.
exporter_conf(TcConfig) ->
#{<<"endpoint">> => ?config(otel_collector_url, TcConfig)}.
span_node(#{<<"processID">> := ProcId}, Procs) ->
#{ProcId := #{<<"tags">> := ProcTags}} = Procs,
[#{<<"value">> := Node}] = lists:filter(
fun(#{<<"key">> := K}) ->
K =:= <<"service.instance.id">>
end,
ProcTags
),
Node.
trace_id(<<"00-", TraceId:32/binary, _/binary>>) ->
TraceId.
filter_traces(TraceId, Traces) ->
lists:filter(fun(#{<<"traceID">> := TrId}) -> TrId =:= TraceId end, Traces).
filter_spans(OpName, Spans) ->
lists:filter(fun(#{<<"operationName">> := Name}) -> Name =:= OpName end, Spans).
get_jaeger_traces(JagerBaseURL) ->
case httpc:request(JagerBaseURL ++ "/api/traces?service=" ++ ?OTEL_SERVICE_NAME) of
{ok, {{_, 200, _}, _, RespBpdy}} ->
{ok, emqx_utils_json:decode(RespBpdy)};
Err ->
ct:pal("Jager error: ~p", Err),
Err
end.
stop_conns(Conns) ->
lists:foreach(fun emqtt:stop/1, Conns).
props(TraceParent) ->
#{'User-Property' => [{<<"traceparent">>, TraceParent}]}.
traceparent(IsSampled) ->
TraceId = otel_id_generator:generate_trace_id(),
SpanId = otel_id_generator:generate_span_id(),
{ok, TraceIdHexStr} = otel_utils:format_binary_string("~32.16.0b", [TraceId]),
{ok, SpanIdHexStr} = otel_utils:format_binary_string("~16.16.0b", [SpanId]),
TraceFlags =
case IsSampled of
true -> <<"01">>;
false -> <<"00">>
end,
<<"00-", TraceIdHexStr/binary, "-", SpanIdHexStr/binary, "-", TraceFlags/binary>>.
connect({Host, Port}, ClientId) ->
{ok, ConnPid} = emqtt:start_link([
{proto_ver, v5},
{host, Host},
{port, Port},
{clientid, ClientId}
]),
{ok, _} = emqtt:connect(ConnPid),
ConnPid.
mqtt_host_port() ->
emqx:get_config([listeners, tcp, default, bind]).
mqtt_host_port(Node) ->
rpc:call(Node, emqx, get_config, [[listeners, tcp, default, bind]]).
cluster(TC, Config) ->
Nodes = emqx_cth_cluster:start(
[
{otel_trace_core1, #{role => core, apps => apps_spec()}},
{otel_trace_core2, #{role => core, apps => apps_spec()}},
{otel_trace_replicant, #{role => replicant, apps => apps_spec()}}
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
Nodes.
apps_spec() ->
[
emqx,
emqx_conf,
emqx_management,
emqx_opentelemetry
].