emqx/test/emqx_trace_handler_SUITE.erl

218 lines
8.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace_handler_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLIENT, [{host, "localhost"},
{clientid, <<"client">>},
{username, <<"testuser">>},
{password, <<"pass">>}
]).
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address].
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(t_trace_clientid, Config) ->
Config;
init_per_testcase(_Case, Config) ->
ok = emqx_logger:set_log_level(debug),
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
Config.
end_per_testcase(_Case, _Config) ->
ok = emqx_logger:set_log_level(warning),
ok.
t_trace_clientid(_Config) ->
%% Start tracing
emqx_logger:set_log_level(error),
{error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"),
emqx_logger:set_log_level(debug),
%% add list clientid
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
{error, {invalid_log_level, bad_level}} =
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
{error, {handler_not_added, {file_error, ".", eisdir}}} =
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/client.log")),
?assert(filelib:is_regular("tmp/client2.log")),
?assert(filelib:is_regular("tmp/client3.log")),
%% Get current traces
?assertMatch([#{type := clientid, filter := "client", name := <<"client">>,
level := debug, dst := "tmp/client.log"},
#{type := clientid, filter := "client2", name := <<"client2">>
, level := debug, dst := "tmp/client2.log"},
#{type := clientid, filter := "client3", name := <<"client3">>,
level := debug, dst := "tmp/client3.log"}
], emqx_trace_handler:running()),
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
{ok, T} = emqtt:start_link(?CLIENT),
emqtt:connect(T),
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
emqtt:ping(T),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
{ok, Bin} = file:read_file("tmp/client.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])),
?assert(filelib:file_size("tmp/client2.log") == 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(clientid, <<"client">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client2">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client3">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
t_trace_topic(_Config) ->
{ok, T} = emqtt:start_link(?CLIENT),
emqtt:connect(T),
%% Start tracing
emqx_logger:set_log_level(debug),
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
?assert(filelib:is_regular("tmp/topic_trace_y.log")),
%% Get current traces
?assertMatch([#{type := topic, filter := <<"x/#">>,
level := debug, dst := "tmp/topic_trace_x.log", name := <<"x/#">>},
#{type := topic, filter := <<"y/#">>,
name := <<"y/#">>, level := debug, dst := "tmp/topic_trace_y.log"}
],
emqx_trace_handler:running()),
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(topic, <<"x/#">>),
ok = emqx_trace_handler:uninstall(topic, <<"y/#">>),
{error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
t_trace_ip_address(_Config) ->
{ok, T} = emqtt:start_link(?CLIENT),
emqtt:connect(T),
%% Start tracing
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
?assert(filelib:is_regular("tmp/ip_trace_y.log")),
%% Get current traces
?assertMatch([#{type := ip_address, filter := "127.0.0.1",
name := <<"127.0.0.1">>,
level := debug, dst := "tmp/ip_trace_x.log"},
#{type := ip_address, filter := "192.168.1.1",
name := <<"192.168.1.1">>,
level := debug, dst := "tmp/ip_trace_y.log"}
],
emqx_trace_handler:running()),
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>),
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
filesync(Name, Type) ->
ct:sleep(50),
filesync(Name, Type, 3).
%% sometime the handler process is not started yet.
filesync(_Name, _Type, 0) -> ok;
filesync(Name, Type, Retry) ->
try
Handler = binary_to_atom(<<"trace_",
(atom_to_binary(Type))/binary, "_", Name/binary>>),
ok = logger_disk_log_h:filesync(Handler)
catch E:R ->
ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
ct:sleep(100),
filesync(Name, Type, Retry - 1)
end.