diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index f85858be3..d5fd423d3 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -20,10 +20,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -%% Mnesia bootstrap --export([mnesia/1]). - -boot_mnesia({mnesia, [boot]}). +-export([mnesia/1]). -export([ publish/1 , subscribe/3 @@ -54,20 +52,22 @@ -define(MAX_SIZE, 30). -ifdef(TEST). --export([log_file/2]). +-export([ log_file/2 + , find_closest_time/2 + ]). -endif. -export_type([ip_address/0]). -type ip_address() :: string(). --record(?TRACE, - { name :: binary() | undefined | '_' - , type :: clientid | topic | ip_address | undefined | '_' - , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_' - , enable = true :: boolean() | '_' - , start_at :: integer() | undefined | '_' - , end_at :: integer() | undefined | '_' - }). +-record(?TRACE, { + name :: binary() | undefined | '_' + , type :: clientid | topic | ip_address | undefined | '_' + , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_' + , enable = true :: boolean() | '_' + , start_at :: integer() | undefined | '_' + , end_at :: integer() | undefined | '_' + }). mnesia(boot) -> ok = mria:create_table(?TRACE, [ @@ -279,24 +279,22 @@ stop_all_trace_handler() -> lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, emqx_trace_handler:running()). get_enable_trace() -> - {atomic, Traces} = - mria:transaction(?COMMON_SHARD, fun() -> - mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) - end), - Traces. + transaction(fun() -> + mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) + end). find_closest_time(Traces, Now) -> Sec = lists:foldl( - fun(#?TRACE{start_at = Start, end_at = End}, Closest) - when Start >= Now andalso Now < End -> %% running - min(End - Now, Closest); - (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting - min(Now - Start, Closest); - (_, Closest) -> Closest %% finished + fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) -> + min(closest(End, Now, Closest), closest(Start, Now, Closest)); + (_, Closest) -> Closest end, 60 * 15, Traces), timer:seconds(Sec). +closest(Time, Now, Closest) when Now >= Time -> Closest; +closest(Time, Now, Closest) -> min(Time - Now, Closest). + disable_finished([]) -> ok; disable_finished(Traces) -> transaction(fun() -> @@ -367,29 +365,31 @@ classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). to_trace(TraceParam) -> - case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of + case to_trace(ensure_map(TraceParam), #?TRACE{}) of {error, Reason} -> {error, Reason}; {ok, #?TRACE{name = undefined}} -> {error, "name required"}; {ok, #?TRACE{type = undefined}} -> {error, "type=[topic,clientid,ip_address] required"}; - {ok, #?TRACE{filter = undefined}} -> - {error, "topic/clientid/ip_address filter required"}; - {ok, TraceRec0} -> + {ok, TraceRec0 = #?TRACE{}} -> case fill_default(TraceRec0) of #?TRACE{start_at = Start, end_at = End} when End =< Start -> {error, "failed by start_at >= end_at"}; - TraceRec -> {ok, TraceRec} + TraceRec -> + {ok, TraceRec} end end. -ensure_proplists(#{} = Trace) -> maps:to_list(Trace); -ensure_proplists(Trace) when is_list(Trace) -> +ensure_map(#{} = Trace) -> + maps:fold(fun(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; + (K, V, Acc) when is_atom(K) -> Acc#{K => V} + end, #{}, Trace); +ensure_map(Trace) when is_list(Trace) -> lists:foldl( - fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc]; - ({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc]; + fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; + ({K, V}, Acc) when is_atom(K) -> Acc#{K => V}; (_, Acc) -> Acc - end, [], Trace). + end, #{}, Trace). fill_default(Trace = #?TRACE{start_at = undefined}) -> fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); @@ -397,49 +397,47 @@ fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60}); fill_default(Trace) -> Trace. -to_trace([], Rec) -> {ok, Rec}; -to_trace([{name, Name} | Trace], Rec) -> - case io_lib:printable_unicode_list(unicode:characters_to_list(Name, utf8)) of - true -> - case binary:match(Name, [<<"/">>], []) of - nomatch -> to_trace(Trace, Rec#?TRACE{name = Name}); - _ -> {error, "name cannot contain /"} - end; - false -> {error, "name must printable unicode"} +-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). + +to_trace(#{name := Name} = Trace, Rec) -> + case re:run(Name, ?NAME_RE) of + nomatch -> {error, "Name should be " ?NAME_RE}; + _ -> to_trace(maps:remove(name, Trace), Rec#?TRACE{name = Name}) end; -to_trace([{type, Type} | Trace], Rec) -> - case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of - true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); - false -> {error, "incorrect type: only support clientid/topic/ip_address"} +to_trace(#{type := clientid, clientid := Filter} = Trace, Rec) -> + Trace0 = maps:without([type, clientid], Trace), + to_trace(Trace0, Rec#?TRACE{type = clientid, filter = Filter}); +to_trace(#{type := topic, topic := Filter} = Trace, Rec) -> + case validate_topic(Filter) of + ok -> + Trace0 = maps:without([type, topic], Trace), + to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter}); + Error -> Error end; -to_trace([{topic, Topic} | Trace], Rec) -> - case validate_topic(Topic) of - ok -> to_trace(Trace, Rec#?TRACE{filter = Topic}); - {error, Reason} -> {error, Reason} +to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> + case validate_ip_address(Filter) of + ok -> + Trace0 = maps:without([type, ip_address], Trace), + to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter}); + Error -> Error end; -to_trace([{clientid, ClientId} | Trace], Rec) -> - to_trace(Trace, Rec#?TRACE{filter = ClientId}); -to_trace([{ip_address, IP} | Trace], Rec) -> - case inet:parse_address(binary_to_list(IP)) of - {ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)}); - {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} - end; -to_trace([{start_at, StartAt} | Trace], Rec) -> +to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; +to_trace(#{start_at := StartAt} = Trace, Rec) -> case to_system_second(StartAt) of - {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); + {ok, Sec} -> to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec}); {error, Reason} -> {error, Reason} end; -to_trace([{end_at, EndAt} | Trace], Rec) -> +to_trace(#{end_at := EndAt} = Trace, Rec) -> Now = erlang:system_time(second), case to_system_second(EndAt) of {ok, Sec} when Sec > Now -> - to_trace(Trace, Rec#?TRACE{end_at = Sec}); + to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec}); {ok, _Sec} -> {error, "end_at time has already passed"}; {error, Reason} -> {error, Reason} end; -to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. +to_trace(_, Rec) -> {ok, Rec}. validate_topic(TopicName) -> try emqx_topic:validate(filter, TopicName) of @@ -448,11 +446,17 @@ validate_topic(TopicName) -> error:Error -> {error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])} end. +validate_ip_address(IP) -> + case inet:parse_address(binary_to_list(IP)) of + {ok, _} -> ok; + {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} + end. to_system_second(At) -> try Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]), - {ok, Sec} + Now = erlang:system_time(second), + {ok, erlang:max(Now, Sec)} catch error: {badmatch, _} -> {error, ["The rfc3339 specification not satisfied: ", At]} end. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 27bada946..c76bf1aa9 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -38,26 +38,10 @@ -export([handler_id/2]). -type tracer() :: #{ - name := binary(), - type := clientid | topic | ip_address, - filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address() - }. - --define(FORMAT, - {logger_formatter, #{ - template => [ - time, " [", level, "] ", - {clientid, - [{peername, [clientid, "@", peername, " "], [clientid, " "]}], - [{peername, [peername, " "], []}] - }, - msg, "\n" - ], - single_line => false, - max_size => unlimited, - depth => unlimited - }} -). + name := binary(), + type := clientid | topic | ip_address, + filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address() + }. -define(CONFIG(_LogFile_), #{ type => halt, @@ -68,25 +52,25 @@ overload_kill_qlen => 20000, %% disable restart overload_kill_restart_after => infinity - }). +}). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ -spec install(Name :: binary() | list(), - Type :: clientid | topic | ip_address, - Filter ::emqx_types:clientid() | emqx_types:topic() | string(), - Level :: logger:level() | all, - LogFilePath :: string()) -> ok | {error, term()}. + Type :: clientid | topic | ip_address, + Filter :: emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string()) -> ok | {error, term()}. install(Name, Type, Filter, Level, LogFile) -> Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)}, install(Who, Level, LogFile). -spec install(Type :: clientid | topic | ip_address, - Filter ::emqx_types:clientid() | emqx_types:topic() | string(), - Level :: logger:level() | all, - LogFilePath :: string()) -> ok | {error, term()}. + Filter :: emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string()) -> ok | {error, term()}. install(Type, Filter, Level, LogFile) -> install(Filter, Type, Filter, Level, LogFile). @@ -111,7 +95,7 @@ install(Who, Level, LogFile) -> end. -spec uninstall(Type :: clientid | topic | ip_address, - Name :: binary() | list()) -> ok | {error, term()}. + Name :: binary() | list()) -> ok | {error, term()}. uninstall(Type, Name) -> HandlerId = handler_id(ensure_bin(Name), Type), uninstall(HandlerId). @@ -126,12 +110,12 @@ uninstall(HandlerId) -> -spec running() -> [ #{ - name => binary(), - type => topic | clientid | ip_address, - id => atom(), - filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(), - level => logger:level(), - dst => file:filename() | console | unknown + name => binary(), + type => topic | clientid | ip_address, + id => atom(), + filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(), + level => logger:level(), + dst => file:filename() | console | unknown } ]. running() -> @@ -159,12 +143,13 @@ filter_ip_address(_Log, _ExpectId) -> ignore. install_handler(Who = #{name := Name, type := Type}, Level, LogFile) -> HandlerId = handler_id(Name, Type), - Config = #{ level => Level, - formatter => ?FORMAT, - filter_default => stop, - filters => filters(Who), - config => ?CONFIG(LogFile) - }, + Config = #{ + level => Level, + formatter => formatter(Who), + filter_default => stop, + filters => filters(Who), + config => ?CONFIG(LogFile) + }, Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), show_prompts(Res, Who, "Start trace"), Res. @@ -176,11 +161,36 @@ filters(#{type := topic, filter := Filter, name := Name}) -> filters(#{type := ip_address, filter := Filter, name := Name}) -> [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. +formatter(#{type := Type}) -> + {logger_formatter, + #{ + template => template(Type), + single_line => false, + max_size => unlimited, + depth => unlimited + } + }. + +%% Don't log clientid since clientid only supports exact match, all client ids are the same. +%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp` +%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read. +template(clientid) -> + [time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"]; +%% TODO better format when clientid is utf8. +template(_) -> + [time, " [", level, "] ", + {clientid, + [{peername, [clientid, "@", peername, " "], [clientid, " "]}], + [{peername, [peername, " "], []}] + }, + msg, "\n" + ]. + filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) -> Init = #{id => Id, level => Level, dst => Dst}, case Filters of [{Type, {_FilterFun, {Filter, Name}}}] when - Type =:= topic orelse + Type =:= topic orelse Type =:= clientid orelse Type =:= ip_address -> [Init#{type => Type, filter => Filter, name => Name} | Acc]; @@ -201,7 +211,7 @@ handler_id(Name, Type) -> do_handler_id(Name, Type) -> TypeStr = atom_to_list(Type), NameStr = unicode:characters_to_list(Name, utf8), - FullNameStr = "trace_" ++ TypeStr ++ "_" ++ NameStr, + FullNameStr = "trace_" ++ TypeStr ++ "_" ++ NameStr, true = io_lib:printable_unicode_list(FullNameStr), FullNameBin = unicode:characters_to_binary(FullNameStr, utf8), binary_to_atom(FullNameBin, utf8). @@ -209,7 +219,7 @@ do_handler_id(Name, Type) -> ensure_bin(List) when is_list(List) -> iolist_to_binary(List); ensure_bin(Bin) when is_binary(Bin) -> Bin. -ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin); +ensure_list(Bin) when is_binary(Bin) -> unicode:characters_to_list(Bin, utf8); ensure_list(List) when is_list(List) -> List. show_prompts(ok, Who, Msg) -> diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 3086dcdd7..10c6949ec 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -22,7 +22,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). --import(emqx_trace_handler_SUITE, [filesync/2]). + -record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}). %%-------------------------------------------------------------------- @@ -33,15 +33,22 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_plugin_libs), emqx_common_test_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). -t_base_create_delete(_Config) -> +init_per_testcase(_, Config) -> + load(), ok = emqx_trace:clear(), + Config. + +end_per_testcase(_) -> + unload(), + ok. + +t_base_create_delete(_Config) -> Now = erlang:system_time(second), Start = to_rfc3339(Now), End = to_rfc3339(Now + 30 * 60), @@ -49,7 +56,7 @@ t_base_create_delete(_Config) -> ClientId = <<"test-device">>, Trace = #{ name => Name, - type => <<"clientid">>, + type => clientid, clientid => ClientId, start_at => Start, end_at => End @@ -84,15 +91,14 @@ t_base_create_delete(_Config) -> ok. t_create_size_max(_Config) -> - emqx_trace:clear(), lists:map(fun(Seq) -> Name = list_to_binary("name" ++ integer_to_list(Seq)), - Trace = [{name, Name}, {type, <<"topic">>}, + Trace = [{name, Name}, {type, topic}, {topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], ok = emqx_trace:create(Trace) end, lists:seq(1, 30)), Trace31 = [{<<"name">>, <<"name31">>}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}], + {<<"type">>, topic}, {<<"topic">>, <<"/x/y/31">>}], {error, _} = emqx_trace:create(Trace31), ok = emqx_trace:delete(<<"name30">>), ok = emqx_trace:create(Trace31), @@ -100,54 +106,52 @@ t_create_size_max(_Config) -> ok. t_create_failed(_Config) -> - ok = emqx_trace:clear(), - UnknownField = [{<<"unknown">>, 12}], + Name = {<<"name">>, <<"test">>}, + UnknownField = [Name, {<<"unknown">>, 12}], {error, Reason1} = emqx_trace:create(UnknownField), - ?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)), + ?assertEqual(<<"type=[topic,clientid,ip_address] required">>, iolist_to_binary(Reason1)), - InvalidTopic = [{<<"topic">>, "#/#//"}], + InvalidTopic = [Name, {<<"topic">>, "#/#//"}, {<<"type">>, topic}], {error, Reason2} = emqx_trace:create(InvalidTopic), ?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)), - InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], + InvalidStart = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>}, + {<<"start_at">>, <<"2021-12-3:12">>}], {error, Reason3} = emqx_trace:create(InvalidStart), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason3)), - InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], + InvalidEnd = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>}, + {<<"end_at">>, <<"2021-12-3:12">>}], {error, Reason4} = emqx_trace:create(InvalidEnd), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason4)), - {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), - ?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)), + {error, Reason7} = emqx_trace:create([Name, {<<"type">>, clientid}]), + ?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason7)), InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, - {<<"type">>, <<"clientid">>}], + {<<"type">>, clientid}], {error, Reason9} = emqx_trace:create(InvalidPackets4), - ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), + ?assertEqual(<<"Name should be ^[A-Za-z]+[A-Za-z0-9-_]*$">>, iolist_to_binary(Reason9)), ?assertEqual({error, "type=[topic,clientid,ip_address] required"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])), - ?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"}, - emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), - ?assertEqual({error, "ip address: einval"}, - emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])), + emqx_trace:create([Name, {<<"type">>, ip_address}, + {<<"ip_address">>, <<"test-name">>}])), ok. t_create_default(_Config) -> - ok = emqx_trace:clear(), {error, "name required"} = emqx_trace:create([]), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]), + {<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]), [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(), ok = emqx_trace:clear(), Trace = [ {<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, + {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} @@ -156,25 +160,38 @@ t_create_default(_Config) -> Now = erlang:system_time(second), Trace2 = [ {<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, + {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, to_rfc3339(Now + 10)}, {<<"end_at">>, to_rfc3339(Now + 3)} ], {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]), + {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}]), [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(), ?assertEqual(10 * 60, End - Start), ?assertEqual(true, Start - erlang:system_time(second) < 5), ok. -t_update_enable(_Config) -> +t_create_with_extra_fields(_Config) -> ok = emqx_trace:clear(), + Trace = [ + {<<"name">>, <<"test-name">>}, + {<<"type">>, topic}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"clientid">>, <<"dev001">>}, + {<<"ip_address">>, <<"127.0.0.1">>} + ], + ok = emqx_trace:create(Trace), + ?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}], + emqx_trace:list()), + ok. + +t_update_enable(_Config) -> Name = <<"test-name">>, Now = erlang:system_time(second), End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), - ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>}, + ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), [#emqx_trace{enable = Enable}] = emqx_trace:list(), ?assertEqual(Enable, true), @@ -192,16 +209,14 @@ t_update_enable(_Config) -> ok. t_load_state(_Config) -> - emqx_trace:clear(), - load(), Now = erlang:system_time(second), - Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, - {<<"end_at">>, to_rfc3339(Now + 2)}], - Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, + Running = #{name => <<"Running">>, type => topic, + topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1), + end_at => to_rfc3339(Now + 2)}, + Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}], - Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>}, + Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}], ok = emqx_trace:create(Running), @@ -218,54 +233,48 @@ t_load_state(_Config) -> Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], ?assertEqual(ExpectEnables2, lists:sort(Enables2)), - unload(), ok. t_client_event(_Config) -> application:set_env(emqx, allow_anonymous, true), - emqx_trace:clear(), ClientId = <<"client-test">>, - load(), Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id_event">>, ok = emqx_trace:create([{<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(200), + {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), emqtt:ping(Client), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), ok = emqx_trace:create([{<<"name">>, <<"test_topic">>}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), - ok = filesync(Name, clientid), - ok = filesync(<<"test_topic">>, topic), + {<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), ok = emqtt:disconnect(Client), - ok = filesync(Name, clientid), - ok = filesync(<<"test_topic">>, topic), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)), {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)), ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), ?assert(erlang:byte_size(Bin) > 0), ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), ?assert(erlang:byte_size(Bin3) > 0), - unload(), ok. t_get_log_filename(_Config) -> - ok = emqx_trace:clear(), - load(), Now = erlang:system_time(second), Start = calendar:system_time_to_rfc3339(Now), End = calendar:system_time_to_rfc3339(Now + 2), Name = <<"name1">>, Trace = [ {<<"name">>, Name}, - {<<"type">>, <<"ip_address">>}, + {<<"type">>, ip_address}, {<<"ip_address">>, <<"127.0.0.1">>}, {<<"start_at">>, list_to_binary(Start)}, {<<"end_at">>, list_to_binary(End)} @@ -275,7 +284,6 @@ t_get_log_filename(_Config) -> ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), ct:sleep(3000), ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), - unload(), ok. t_trace_file(_Config) -> @@ -291,22 +299,43 @@ t_trace_file(_Config) -> ok. t_download_log(_Config) -> - emqx_trace:clear(), - load(), ClientId = <<"client-test">>, Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id">>, ok = emqx_trace:create([{<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], - ok = filesync(Name, clientid), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), - unload(), + ok. + +t_find_closed_time(_Config) -> + DefaultMs = 60 * 15000, + Now = erlang:system_time(second), + Traces2 = [], + ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)), + Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1, + end_at = Now + 2, enable = false}], + ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)), + Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}], + ?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)), + Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2, + end_at = Now + 10, enable = true}], + ?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)), + Traces = [ + #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true}, + #emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true}, + #emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true}, + #emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true}, + #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true}, + #emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false} + ], + ?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)), ok. to_rfc3339(Second) -> diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index 950f0faf6..abe233b58 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -28,7 +28,7 @@ {password, <<"pass">>} ]). -all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address]. +all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8]. init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), @@ -85,7 +85,6 @@ t_trace_clientid(_Config) -> emqtt:connect(T), emqtt:publish(T, <<"a/b/c">>, <<"hi">>), emqtt:ping(T), - ok = filesync(<<"client">>, clientid), ok = filesync(<<"client2">>, clientid), ok = filesync(<<"client3">>, clientid), @@ -106,6 +105,22 @@ t_trace_clientid(_Config) -> emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()). +t_trace_clientid_utf8(_) -> + emqx_logger:set_log_level(debug), + + Utf8Id = <<"client 漢字編碼"/utf8>>, + ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"), + {ok, T} = emqtt:start_link([{clientid, Utf8Id}]), + emqtt:connect(T), + [begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)], + emqtt:ping(T), + + ok = filesync(Utf8Id, clientid), + ok = emqx_trace_handler:uninstall(clientid, Utf8Id), + emqtt:disconnect(T), + ?assertEqual([], emqx_trace_handler:running()), + ok. + t_trace_topic(_Config) -> {ok, T} = emqtt:start_link(?CLIENT), emqtt:connect(T), @@ -161,6 +176,7 @@ t_trace_ip_address(_Config) -> ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"), ok = filesync(<<"127.0.0.1">>, ip_address), ok = filesync(<<"192.168.1.1">>, ip_address), + %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/ip_trace_x.log")), ?assert(filelib:is_regular("tmp/ip_trace_y.log")), @@ -198,6 +214,7 @@ t_trace_ip_address(_Config) -> emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()). + filesync(Name, Type) -> ct:sleep(50), filesync(Name, Type, 3). diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl new file mode 100644 index 000000000..d6902d123 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -0,0 +1,408 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_trace). + +-behaviour(minirest_api). + +-include_lib("kernel/include/file.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ api_spec/0 + , fields/1 + , paths/0 + , schema/1 + , namespace/0 + ]). + +-export([ trace/2 + , delete_trace/2 + , update_trace/2 + , download_trace_log/2 + , stream_log_file/2 + ]). + +-export([validate_name/1]). + +%% for rpc +-export([read_trace_file/3 + , get_trace_size/0 + ]). + +-define(TO_BIN(_B_), iolist_to_binary(_B_)). +-define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}). + +namespace() -> "trace". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). + +paths() -> + ["/trace", "/trace/:name/stop", "/trace/:name/download", "/trace/:name/log", "/trace/:name"]. + + +schema("/trace") -> + #{ + 'operationId' => trace, + get => #{ + description => "List all trace", + responses => #{ + 200 => hoconsc:ref(trace) + } + }, + post => #{ + description => "Create new trace", + 'requestBody' => delete([status, log_size], fields(trace)), + responses => #{ + 200 => hoconsc:ref(trace) + } + }, + delete => #{ + description => "Clear all traces", + responses => #{ + 204 => <<"No Content">> + } + } + }; +schema("/trace/:name") -> + #{ + 'operationId' => delete_trace, + delete => #{ + description => "Delete trace by name", + parameters => [hoconsc:ref(name)], + responses => #{ + 204 => <<"Delete successfully">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>) + } + } + }; +schema("/trace/:name/stop") -> + #{ + 'operationId' => update_trace, + put => #{ + description => "Stop trace by name", + parameters => [hoconsc:ref(name)], + responses => #{ + 200 => hoconsc:ref(trace), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>) + } + } + }; +schema("/trace/:name/download") -> + #{ + 'operationId' => download_trace_log, + get => #{ + description => "Download trace log by name", + parameters => [hoconsc:ref(name)], + %% todo zip file octet-stream + responses => #{ + 200 => <<"TODO octet-stream">> + } + } + }; +schema("/trace/:name/log") -> + #{ + 'operationId' => stream_log_file, + get => #{ + description => "view trace log", + parameters => [ + hoconsc:ref(name), + hoconsc:ref(bytes), + hoconsc:ref(position), + hoconsc:ref(node) + ], + %% todo response data + responses => #{ + 200 => <<"TODO">> + } + } + }. + +fields(trace) -> + [ + {name, hoconsc:mk(binary(), + #{desc => "Unique and format by [a-zA-Z0-9-_]", + validator => fun ?MODULE:validate_name/1, + nullable => false, + example => <<"EMQX-TRACE-1">>})}, + {type, hoconsc:mk(hoconsc:enum([clientid, topic, ip_address]), + #{desc => """Filter type""", + nullable => false, + example => <<"clientid">>})}, + {topic, hoconsc:mk(binary(), + #{desc => """support mqtt wildcard topic.""", + nullable => true, + example => <<"/dev/#">>})}, + {clientid, hoconsc:mk(binary(), + #{desc => """mqtt clientid.""", + nullable => true, + example => <<"dev-001">>})}, + %% TODO add ip_address type in emqx_schema.erl + {ip_address, hoconsc:mk(binary(), + #{desc => "client ip address", + nullable => true, + example => <<"127.0.0.1">> + })}, + {status, hoconsc:mk(hoconsc:enum([running, stopped, waiting]), + #{desc => "trace status", + nullable => true, + example => running + })}, + {start_at, hoconsc:mk(binary(), + #{desc => "rfc3339 timestamp", + nullable => true, + example => <<"2021-11-04T18:17:38+08:00">> + })}, + {end_at, hoconsc:mk(binary(), + #{desc => "rfc3339 timestamp", + nullable => true, + example => <<"2021-11-05T18:17:38+08:00">> + })}, + {log_size, hoconsc:mk(hoconsc:array(map()), + #{desc => "trace log size", + example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], + nullable => true})} + ]; +fields(name) -> + [{name, hoconsc:mk(binary(), + #{ + desc => <<"[a-zA-Z0-9-_]">>, + example => <<"EMQX-TRACE-1">>, + in => path, + validator => fun ?MODULE:validate_name/1 + })} + ]; +fields(node) -> + [{node, hoconsc:mk(binary(), + #{ + desc => "Node name", + in => query, + nullable => true + })}]; +fields(bytes) -> + [{bytes, hoconsc:mk(integer(), + #{ + desc => "Maximum number of bytes to store in request", + in => query, + nullable => true, + default => 1000 + })}]; +fields(position) -> + [{position, hoconsc:mk(integer(), + #{ + desc => "Offset from the current trace position.", + in => query, + nullable => true, + default => 0 + })}]. + +-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). + +validate_name(Name) -> + NameLen = byte_size(Name), + case NameLen > 0 andalso NameLen =< 256 of + true -> + case re:run(Name, ?NAME_RE) of + nomatch -> {error, "Name should be " ?NAME_RE}; + _ -> ok + end; + false -> {error, "Name Length must =< 256"} + end. + +delete(Keys, Fields) -> + lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys). + +trace(get, _Params) -> + case emqx_trace:list() of + [] -> {200, []}; + List0 -> + List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, + emqx_trace:format(List0)), + Nodes = mria_mnesia:running_nodes(), + TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000), + AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), + Now = erlang:system_time(second), + Traces = + lists:map(fun(Trace = #{name := Name, start_at := Start, + end_at := End, enable := Enable, type := Type, filter := Filter}) -> + FileName = emqx_trace:filename(Name, Start), + LogSize = collect_file_size(Nodes, FileName, AllFileSize), + Trace0 = maps:without([enable, filter], Trace), + Trace0#{log_size => LogSize + , Type => iolist_to_binary(Filter) + , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) + , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) + , status => status(Enable, Start, End, Now) + } + end, List), + {200, Traces} + end; +trace(post, #{body := Param}) -> + case emqx_trace:create(Param) of + ok -> {200}; + {error, {already_existed, Name}} -> + {400, #{ + code => 'ALREADY_EXISTED', + message => ?TO_BIN([Name, " Already Exists"]) + }}; + {error, {duplicate_condition, Name}} -> + {400, #{ + code => 'DUPLICATE_CONDITION', + message => ?TO_BIN([Name, " Duplication Condition"]) + }}; + {error, Reason} -> + {400, #{ + code => 'INCORRECT_PARAMS', + message => ?TO_BIN(Reason) + }} + end; +trace(delete, _Param) -> + ok = emqx_trace:clear(), + {200}. + +delete_trace(delete, #{bindings := #{name := Name}}) -> + case emqx_trace:delete(Name) of + ok -> {200}; + {error, not_found} -> ?NOT_FOUND(Name) + end. + +update_trace(put, #{bindings := #{name := Name}}) -> + case emqx_trace:update(Name, false) of + ok -> {200, #{enable => false, name => Name}}; + {error, not_found} -> ?NOT_FOUND(Name) + end. + +%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. +%% cowboy_compress_h will auto encode gzip format. +download_trace_log(get, #{bindings := #{name := Name}}) -> + case emqx_trace:get_trace_filename(Name) of + {ok, TraceLog} -> + TraceFiles = collect_trace_file(TraceLog), + ZipDir = emqx_trace:zip_dir(), + Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), + ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", + {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), + emqx_trace:delete_files_after_send(ZipFileName, Zips), + {200, ZipFile}; + {error, not_found} -> ?NOT_FOUND(Name) + end. + +group_trace_file(ZipDir, TraceLog, TraceFiles) -> + lists:foldl(fun(Res, Acc) -> + case Res of + {ok, Node, Bin} -> + ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, + case file:write_file(ZipName, Bin) of + ok -> [Node ++ "-" ++ TraceLog | Acc]; + _ -> Acc + end; + {error, Node, Reason} -> + ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), + Acc + end + end, [], TraceFiles). + +collect_trace_file(TraceLog) -> + cluster_call(emqx_trace, trace_file, [TraceLog], 60000). + +cluster_call(Mod, Fun, Args, Timeout) -> + Nodes = mria_mnesia:running_nodes(), + {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), + BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]), + GoodRes. + +stream_log_file(get, #{bindings := #{name := Name}, query_string := Query} = T) -> + Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())), + Position = maps:get(<<"position">>, Query, 0), + Bytes = maps:get(<<"bytes">>, Query, 1000), + logger:error("~p", [T]), + case to_node(Node0) of + {ok, Node} -> + case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + {ok, Bin} -> + Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, + {200, #{meta => Meta, items => Bin}}; + {eof, Size} -> + Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, + {200, #{meta => Meta, items => <<"">>}}; + {error, Reason} -> + logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]), + {400, #{code => 'READ_FILE_ERROR', message => Reason}}; + {badrpc, nodedown} -> + {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}} + end; + {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}} + end. + +get_trace_size() -> + TraceDir = emqx_trace:trace_dir(), + Node = node(), + case file:list_dir(TraceDir) of + {ok, AllFiles} -> + lists:foldl(fun(File, Acc) -> + FullFileName = filename:join(TraceDir, File), + Acc#{{Node, File} => filelib:file_size(FullFileName)} + end, #{}, lists:delete("zip", AllFiles)); + _ -> #{} + end. + +%% this is an rpc call for stream_log_file/2 +read_trace_file(Name, Position, Limit) -> + case emqx_trace:get_trace_filename(Name) of + {error, _} = Error -> Error; + {ok, TraceFile} -> + TraceDir = emqx_trace:trace_dir(), + TracePath = filename:join([TraceDir, TraceFile]), + read_file(TracePath, Position, Limit) + end. + +read_file(Path, Offset, Bytes) -> + case file:open(Path, [read, raw, binary]) of + {ok, IoDevice} -> + try + _ = case Offset of + 0 -> ok; + _ -> file:position(IoDevice, {bof, Offset}) + end, + case file:read(IoDevice, Bytes) of + {ok, Bin} -> {ok, Bin}; + {error, Reason} -> {error, Reason}; + eof -> + {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), + {eof, Size} + end + after + file:close(IoDevice) + end; + {error, Reason} -> {error, Reason} + end. + +to_node(Node) -> + try {ok, binary_to_existing_atom(Node)} + catch _:_ -> + {error, not_found} + end. + +collect_file_size(Nodes, FileName, AllFiles) -> + lists:foldl(fun(Node, Acc) -> + Size = maps:get({Node, FileName}, AllFiles, 0), + Acc#{Node => Size} + end, #{}, Nodes). + +status(false, _Start, _End, _Now) -> <<"stopped">>; +status(true, Start, _End, Now) when Now < Start -> <<"waiting">>; +status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; +status(true, _Start, _End, _Now) -> <<"running">>. diff --git a/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl new file mode 100644 index 000000000..67fc6c8c0 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl @@ -0,0 +1,190 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_trace_api_SUITE). + +%% API +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-define(HOST, "http://127.0.0.1:18083/"). +-define(API_VERSION, "v5"). +-define(BASE_PATH, "api"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_mgmt_api_test_util:init_suite(), + Config. + +end_per_suite(_) -> + emqx_mgmt_api_test_util:end_suite(). + +t_http_test(_Config) -> + emqx_trace:clear(), + load(), + Header = auth_header_(), + %% list + {ok, Empty} = request_api(get, api_path("trace"), Header), + ?assertEqual([], json(Empty)), + %% create + ErrorTrace = #{}, + {error, {"HTTP/1.1", 400, "Bad Request"}, Body} = + request_api(post, api_path("trace"), Header, ErrorTrace), + ?assertEqual( + #{ + <<"code">> => <<"BAD_REQUEST">>, + <<"message">> => <<"name : not_nullable">> + }, json(Body)), + + Name = <<"test-name">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/z">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(<<>>, Create), + + {ok, List} = request_api(get, api_path("trace"), Header), + [Data] = json(List), + ?assertEqual(Name, maps:get(<<"name">>, Data)), + + %% update + {ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}), + ?assertEqual(#{<<"enable">> => false, + <<"name">> => <<"test-name">>}, json(Update)), + + {ok, List1} = request_api(get, api_path("trace"), Header), + [Data1] = json(List1), + Node = atom_to_binary(node()), + ?assertMatch(#{ + <<"status">> := <<"stopped">>, + <<"name">> := <<"test-name">>, + <<"log_size">> := #{Node := _}, + <<"start_at">> := _, + <<"end_at">> := _, + <<"type">> := <<"topic">>, + <<"topic">> := <<"/x/y/z">> + }, Data1), + + %% delete + {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(<<>>, Delete), + + {error, {"HTTP/1.1", 404, "Not Found"}, DeleteNotFound} + = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>, + <<"message">> => <<"test-name NOT FOUND">>}, json(DeleteNotFound)), + + {ok, List2} = request_api(get, api_path("trace"), Header), + ?assertEqual([], json(List2)), + + %% clear + {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(<<>>, Create1), + + {ok, Clear} = request_api(delete, api_path("trace"), Header), + ?assertEqual(<<>>, Clear), + + unload(), + ok. + +t_stream_log(_Config) -> + application:set_env(emqx, allow_anonymous, true), + emqx_trace:clear(), + load(), + ClientId = <<"client-stream">>, + Now = erlang:system_time(second), + Name = <<"test_stream_log">>, + Start = to_rfc3339(Now - 10), + ok = emqx_trace:create(#{<<"name">> => Name, + <<"type">> => clientid, <<"clientid">> => ClientId, <<"start_at">> => Start}), + ct:sleep(200), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)], + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]), + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ct:sleep(200), + File = emqx_trace:log_file(Name, Now), + ct:pal("FileName: ~p", [File]), + {ok, FileBin} = file:read_file(File), + ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), + Header = auth_header_(), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header), + #{<<"meta">> := Meta, <<"items">> := Bin} = json(Binary), + ?assertEqual(10, byte_size(Bin)), + ?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta), + Path = api_path("trace/test_stream_log/log?position=20&bytes=10"), + {ok, Binary1} = request_api(get, Path, Header), + #{<<"meta">> := Meta1, <<"items">> := Bin1} = json(Binary1), + ?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1), + ?assertEqual(10, byte_size(Bin1)), + unload(), + ok. + +to_rfc3339(Second) -> + list_to_binary(calendar:system_time_to_rfc3339(Second)). + +auth_header_() -> + auth_header_("admin", "public"). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), + {"Authorization", "Basic " ++ Encoded}. + +request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). + +request_api(Method, Url, Auth, Body) -> + Request = {Url, [Auth], "application/json", emqx_json:encode(Body)}, + do_request_api(Method, Request). + +do_request_api(Method, Request) -> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {error, {shutdown, server_closed}} -> + {error, server_closed}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} + when Code =:= 200 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _Header, Body}} -> + {error, Reason, Body} + end. + +api_path(Path) -> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]). + +json(Data) -> + {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. + +load() -> + emqx_trace:start_link(). + +unload() -> + gen_server:stop(emqx_trace).