%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_trace_SUITE). %% API -compile(export_all). -compile(nowarn_export_all). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). -record(emqx_trace, { name, type, topic, clientid, packets = [], enable = true, start_at, end_at, log_size = #{} }). -define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL' , 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK' , 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']). %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> application:load(emqx_plugin_libs), emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_base_create_delete(_Config) -> ok = emqx_trace:clear(), Now = erlang:system_time(second), Start = to_rfc3339(Now), End = to_rfc3339(Now + 30 * 60), Name = <<"name1">>, ClientId = <<"test-device">>, Packets = [atom_to_binary(E) || E <- ?PACKETS], Trace = [ {<<"name">>, Name}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"packets">>, Packets}, {<<"start_at">>, Start}, {<<"end_at">>, End} ], AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}), ok = emqx_trace:create(Trace), ?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)), ?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)), [TraceRec] = emqx_trace:list(), Expect = #emqx_trace{ name = Name, type = clientid, topic = undefined, clientid = ClientId, packets = ?PACKETS, start_at = Now, end_at = Now + 30 * 60 }, ?assertEqual(Expect, TraceRec), ExpectFormat = [ #{ clientid => <<"test-device">>, enable => true, type => clientid, packets => ?PACKETS, name => <<"name1">>, start_at => Start, end_at => End, log_size => #{}, topic => undefined } ], ?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])), ?assertEqual(ok, emqx_trace:delete(Name)), ?assertEqual({error, not_found}, emqx_trace:delete(Name)), ?assertEqual([], emqx_trace:list()), ok. t_create_size_max(_Config) -> emqx_trace:clear(), lists:map(fun(Seq) -> Name = list_to_binary("name" ++ integer_to_list(Seq)), Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], ok = emqx_trace:create(Trace) end, lists:seq(1, 30)), Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}], {error, _} = emqx_trace:create(Trace31), ok = emqx_trace:delete(<<"name30">>), ok = emqx_trace:create(Trace31), ?assertEqual(30, erlang:length(emqx_trace:list())), ok. t_create_failed(_Config) -> ok = emqx_trace:clear(), UnknownField = [{<<"unknown">>, 12}], {error, Reason1} = emqx_trace:create(UnknownField), ?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)), InvalidTopic = [{<<"topic">>, "#/#//"}], {error, Reason2} = emqx_trace:create(InvalidTopic), ?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)), InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], {error, Reason3} = emqx_trace:create(InvalidStart), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason3)), InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], {error, Reason4} = emqx_trace:create(InvalidEnd), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason4)), InvalidPackets = [{<<"packets">>, [<<"publish">>]}], {error, Reason5} = emqx_trace:create(InvalidPackets), ?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)), InvalidPackets2 = [{<<"packets">>, <<"publish">>}], {error, Reason6} = emqx_trace:create(InvalidPackets2), ?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)), {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), ?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)), InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}], {error, Reason9} = emqx_trace:create(InvalidPackets4), ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), ?assertEqual({error, "type required"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}])), ?assertEqual({error, "incorrect type: only support clientid/topic"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), ok. t_create_default(_Config) -> ok = emqx_trace:clear(), {error, "name required"} = emqx_trace:create([]), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]), [#emqx_trace{packets = Packets}] = emqx_trace:list(), ?assertEqual([], Packets), ok = emqx_trace:clear(), Trace = [ {<<"name">>, <<"test-name">>}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} ], {error, "end_at time has already passed"} = emqx_trace:create(Trace), Now = erlang:system_time(second), Trace2 = [ {<<"name">>, <<"test-name">>}, {<<"type">>, <<"topic">>}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, to_rfc3339(Now + 10)}, {<<"end_at">>, to_rfc3339(Now + 3)} ], {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"type">>, <<"topic">>}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]), [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(), ?assertEqual(10 * 60, End - Start), ?assertEqual(true, Start - erlang:system_time(second) < 5), ok. t_update_enable(_Config) -> ok = emqx_trace:clear(), Name = <<"test-name">>, Now = erlang:system_time(second), End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), ok = emqx_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), [#emqx_trace{enable = Enable}] = emqx_trace:list(), ?assertEqual(Enable, true), ok = emqx_trace:update(Name, false), [#emqx_trace{enable = false}] = emqx_trace:list(), ok = emqx_trace:update(Name, false), [#emqx_trace{enable = false}] = emqx_trace:list(), ok = emqx_trace:update(Name, true), [#emqx_trace{enable = true}] = emqx_trace:list(), ok = emqx_trace:update(Name, false), [#emqx_trace{enable = false}] = emqx_trace:list(), ?assertEqual({error, not_found}, emqx_trace:update(<<"Name not found">>, true)), ct:sleep(2100), ?assertEqual({error, finished}, emqx_trace:update(Name, true)), ok. t_load_state(_Config) -> emqx_trace:clear(), load(), Now = erlang:system_time(second), Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, {<<"end_at">>, to_rfc3339(Now + 2)}], Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}], Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}], ok = emqx_trace:create(Running), ok = emqx_trace:create(Waiting), {error, "end_at time has already passed"} = emqx_trace:create(Finished), Traces = emqx_trace:format(emqx_trace:list()), ?assertEqual(2, erlang:length(Traces)), Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces), ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}], ?assertEqual(ExpectEnables, lists:sort(Enables)), ct:sleep(3500), Traces2 = emqx_trace:format(emqx_trace:list()), ?assertEqual(2, erlang:length(Traces2)), Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], ?assertEqual(ExpectEnables2, lists:sort(Enables2)), unload(), ok. t_client_event(_Config) -> application:set_env(emqx, allow_anonymous, true), emqx_trace:clear(), ClientId = <<"client-test">>, load(), Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id_event">>, ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), ct:sleep(200), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), emqtt:ping(Client), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), ct:sleep(200), ok = emqx_trace:create([{<<"name">>, <<"test_topic">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), ct:sleep(200), {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), ok = emqtt:disconnect(Client), ct:sleep(200), {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)), {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)), ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), ?assert(erlang:byte_size(Bin) > 0), ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), ?assert(erlang:byte_size(Bin3) > 0), unload(), ok. t_get_log_filename(_Config) -> ok = emqx_trace:clear(), load(), Now = erlang:system_time(second), Start = calendar:system_time_to_rfc3339(Now), End = calendar:system_time_to_rfc3339(Now + 2), Name = <<"name1">>, ClientId = <<"test-device">>, Packets = [atom_to_binary(E) || E <- ?PACKETS], Trace = [ {<<"name">>, Name}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"packets">>, Packets}, {<<"start_at">>, list_to_binary(Start)}, {<<"end_at">>, list_to_binary(End)} ], ok = emqx_trace:create(Trace), ?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)), ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), ct:sleep(3000), ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), unload(), ok. t_trace_file(_Config) -> FileName = "test.log", Content = <<"test \n test">>, TraceDir = emqx_trace:trace_dir(), File = filename:join(TraceDir, FileName), ok = file:write_file(File, Content), {ok, Node, Bin} = emqx_trace:trace_file(FileName), ?assertEqual(Node, atom_to_list(node())), ?assertEqual(Content, Bin), ok = file:delete(File), ok. t_download_log(_Config) -> emqx_trace:clear(), load(), ClientId = <<"client-test">>, Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id">>, ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], ct:sleep(100), {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), unload(), ok. to_rfc3339(Second) -> list_to_binary(calendar:system_time_to_rfc3339(Second)). load() -> emqx_trace:start_link(). unload() -> gen_server:stop(emqx_trace).