feat(trace): trace http api schema
This commit is contained in:
parent
e0fd6d553e
commit
89d904b7ef
|
@ -20,10 +20,8 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
|
||||||
-export([mnesia/1]).
|
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
-export([ publish/1
|
-export([ publish/1
|
||||||
, subscribe/3
|
, subscribe/3
|
||||||
|
@ -54,14 +52,16 @@
|
||||||
-define(MAX_SIZE, 30).
|
-define(MAX_SIZE, 30).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([log_file/2]).
|
-export([ log_file/2
|
||||||
|
, find_closest_time/2
|
||||||
|
]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-export_type([ip_address/0]).
|
-export_type([ip_address/0]).
|
||||||
-type ip_address() :: string().
|
-type ip_address() :: string().
|
||||||
|
|
||||||
-record(?TRACE,
|
-record(?TRACE, {
|
||||||
{ name :: binary() | undefined | '_'
|
name :: binary() | undefined | '_'
|
||||||
, type :: clientid | topic | ip_address | undefined | '_'
|
, type :: clientid | topic | ip_address | undefined | '_'
|
||||||
, filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
|
, filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
|
||||||
, enable = true :: boolean() | '_'
|
, enable = true :: boolean() | '_'
|
||||||
|
@ -279,24 +279,22 @@ stop_all_trace_handler() ->
|
||||||
lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
|
lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
|
||||||
emqx_trace_handler:running()).
|
emqx_trace_handler:running()).
|
||||||
get_enable_trace() ->
|
get_enable_trace() ->
|
||||||
{atomic, Traces} =
|
transaction(fun() ->
|
||||||
mria:transaction(?COMMON_SHARD, fun() ->
|
|
||||||
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
||||||
end),
|
end).
|
||||||
Traces.
|
|
||||||
|
|
||||||
find_closest_time(Traces, Now) ->
|
find_closest_time(Traces, Now) ->
|
||||||
Sec =
|
Sec =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#?TRACE{start_at = Start, end_at = End}, Closest)
|
fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
|
||||||
when Start >= Now andalso Now < End -> %% running
|
min(closest(End, Now, Closest), closest(Start, Now, Closest));
|
||||||
min(End - Now, Closest);
|
(_, Closest) -> Closest
|
||||||
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
|
|
||||||
min(Now - Start, Closest);
|
|
||||||
(_, Closest) -> Closest %% finished
|
|
||||||
end, 60 * 15, Traces),
|
end, 60 * 15, Traces),
|
||||||
timer:seconds(Sec).
|
timer:seconds(Sec).
|
||||||
|
|
||||||
|
closest(Time, Now, Closest) when Now >= Time -> Closest;
|
||||||
|
closest(Time, Now, Closest) -> min(Time - Now, Closest).
|
||||||
|
|
||||||
disable_finished([]) -> ok;
|
disable_finished([]) -> ok;
|
||||||
disable_finished(Traces) ->
|
disable_finished(Traces) ->
|
||||||
transaction(fun() ->
|
transaction(fun() ->
|
||||||
|
@ -367,29 +365,31 @@ classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
|
||||||
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
||||||
|
|
||||||
to_trace(TraceParam) ->
|
to_trace(TraceParam) ->
|
||||||
case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of
|
case to_trace(ensure_map(TraceParam), #?TRACE{}) of
|
||||||
{error, Reason} -> {error, Reason};
|
{error, Reason} -> {error, Reason};
|
||||||
{ok, #?TRACE{name = undefined}} ->
|
{ok, #?TRACE{name = undefined}} ->
|
||||||
{error, "name required"};
|
{error, "name required"};
|
||||||
{ok, #?TRACE{type = undefined}} ->
|
{ok, #?TRACE{type = undefined}} ->
|
||||||
{error, "type=[topic,clientid,ip_address] required"};
|
{error, "type=[topic,clientid,ip_address] required"};
|
||||||
{ok, #?TRACE{filter = undefined}} ->
|
{ok, TraceRec0 = #?TRACE{}} ->
|
||||||
{error, "topic/clientid/ip_address filter required"};
|
|
||||||
{ok, TraceRec0} ->
|
|
||||||
case fill_default(TraceRec0) of
|
case fill_default(TraceRec0) of
|
||||||
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
||||||
{error, "failed by start_at >= end_at"};
|
{error, "failed by start_at >= end_at"};
|
||||||
TraceRec -> {ok, TraceRec}
|
TraceRec ->
|
||||||
|
{ok, TraceRec}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_proplists(#{} = Trace) -> maps:to_list(Trace);
|
ensure_map(#{} = Trace) ->
|
||||||
ensure_proplists(Trace) when is_list(Trace) ->
|
maps:fold(fun(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
|
||||||
|
(K, V, Acc) when is_atom(K) -> Acc#{K => V}
|
||||||
|
end, #{}, Trace);
|
||||||
|
ensure_map(Trace) when is_list(Trace) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc];
|
fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
|
||||||
({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc];
|
({K, V}, Acc) when is_atom(K) -> Acc#{K => V};
|
||||||
(_, Acc) -> Acc
|
(_, Acc) -> Acc
|
||||||
end, [], Trace).
|
end, #{}, Trace).
|
||||||
|
|
||||||
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
||||||
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
||||||
|
@ -397,49 +397,47 @@ fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
||||||
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
|
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
|
||||||
fill_default(Trace) -> Trace.
|
fill_default(Trace) -> Trace.
|
||||||
|
|
||||||
to_trace([], Rec) -> {ok, Rec};
|
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
|
||||||
to_trace([{name, Name} | Trace], Rec) ->
|
|
||||||
case io_lib:printable_unicode_list(unicode:characters_to_list(Name, utf8)) of
|
to_trace(#{name := Name} = Trace, Rec) ->
|
||||||
true ->
|
case re:run(Name, ?NAME_RE) of
|
||||||
case binary:match(Name, [<<"/">>], []) of
|
nomatch -> {error, "Name should be " ?NAME_RE};
|
||||||
nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
|
_ -> to_trace(maps:remove(name, Trace), Rec#?TRACE{name = Name})
|
||||||
_ -> {error, "name cannot contain /"}
|
|
||||||
end;
|
end;
|
||||||
false -> {error, "name must printable unicode"}
|
to_trace(#{type := clientid, clientid := Filter} = Trace, Rec) ->
|
||||||
|
Trace0 = maps:without([type, clientid], Trace),
|
||||||
|
to_trace(Trace0, Rec#?TRACE{type = clientid, filter = Filter});
|
||||||
|
to_trace(#{type := topic, topic := Filter} = Trace, Rec) ->
|
||||||
|
case validate_topic(Filter) of
|
||||||
|
ok ->
|
||||||
|
Trace0 = maps:without([type, topic], Trace),
|
||||||
|
to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter});
|
||||||
|
Error -> Error
|
||||||
end;
|
end;
|
||||||
to_trace([{type, Type} | Trace], Rec) ->
|
to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
|
||||||
case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of
|
case validate_ip_address(Filter) of
|
||||||
true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
|
ok ->
|
||||||
false -> {error, "incorrect type: only support clientid/topic/ip_address"}
|
Trace0 = maps:without([type, ip_address], Trace),
|
||||||
|
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter});
|
||||||
|
Error -> Error
|
||||||
end;
|
end;
|
||||||
to_trace([{topic, Topic} | Trace], Rec) ->
|
to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};
|
||||||
case validate_topic(Topic) of
|
to_trace(#{start_at := StartAt} = Trace, Rec) ->
|
||||||
ok -> to_trace(Trace, Rec#?TRACE{filter = Topic});
|
|
||||||
{error, Reason} -> {error, Reason}
|
|
||||||
end;
|
|
||||||
to_trace([{clientid, ClientId} | Trace], Rec) ->
|
|
||||||
to_trace(Trace, Rec#?TRACE{filter = ClientId});
|
|
||||||
to_trace([{ip_address, IP} | Trace], Rec) ->
|
|
||||||
case inet:parse_address(binary_to_list(IP)) of
|
|
||||||
{ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)});
|
|
||||||
{error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
|
|
||||||
end;
|
|
||||||
to_trace([{start_at, StartAt} | Trace], Rec) ->
|
|
||||||
case to_system_second(StartAt) of
|
case to_system_second(StartAt) of
|
||||||
{ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
|
{ok, Sec} -> to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
to_trace([{end_at, EndAt} | Trace], Rec) ->
|
to_trace(#{end_at := EndAt} = Trace, Rec) ->
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
case to_system_second(EndAt) of
|
case to_system_second(EndAt) of
|
||||||
{ok, Sec} when Sec > Now ->
|
{ok, Sec} when Sec > Now ->
|
||||||
to_trace(Trace, Rec#?TRACE{end_at = Sec});
|
to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec});
|
||||||
{ok, _Sec} ->
|
{ok, _Sec} ->
|
||||||
{error, "end_at time has already passed"};
|
{error, "end_at time has already passed"};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
|
to_trace(_, Rec) -> {ok, Rec}.
|
||||||
|
|
||||||
validate_topic(TopicName) ->
|
validate_topic(TopicName) ->
|
||||||
try emqx_topic:validate(filter, TopicName) of
|
try emqx_topic:validate(filter, TopicName) of
|
||||||
|
@ -448,11 +446,17 @@ validate_topic(TopicName) ->
|
||||||
error:Error ->
|
error:Error ->
|
||||||
{error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
|
{error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
|
||||||
end.
|
end.
|
||||||
|
validate_ip_address(IP) ->
|
||||||
|
case inet:parse_address(binary_to_list(IP)) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
{error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
|
||||||
|
end.
|
||||||
|
|
||||||
to_system_second(At) ->
|
to_system_second(At) ->
|
||||||
try
|
try
|
||||||
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
|
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
|
||||||
{ok, Sec}
|
Now = erlang:system_time(second),
|
||||||
|
{ok, erlang:max(Now, Sec)}
|
||||||
catch error: {badmatch, _} ->
|
catch error: {badmatch, _} ->
|
||||||
{error, ["The rfc3339 specification not satisfied: ", At]}
|
{error, ["The rfc3339 specification not satisfied: ", At]}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -43,22 +43,6 @@
|
||||||
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
|
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-define(FORMAT,
|
|
||||||
{logger_formatter, #{
|
|
||||||
template => [
|
|
||||||
time, " [", level, "] ",
|
|
||||||
{clientid,
|
|
||||||
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
|
|
||||||
[{peername, [peername, " "], []}]
|
|
||||||
},
|
|
||||||
msg, "\n"
|
|
||||||
],
|
|
||||||
single_line => false,
|
|
||||||
max_size => unlimited,
|
|
||||||
depth => unlimited
|
|
||||||
}}
|
|
||||||
).
|
|
||||||
|
|
||||||
-define(CONFIG(_LogFile_), #{
|
-define(CONFIG(_LogFile_), #{
|
||||||
type => halt,
|
type => halt,
|
||||||
file => _LogFile_,
|
file => _LogFile_,
|
||||||
|
@ -159,8 +143,9 @@ filter_ip_address(_Log, _ExpectId) -> ignore.
|
||||||
|
|
||||||
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
|
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
|
||||||
HandlerId = handler_id(Name, Type),
|
HandlerId = handler_id(Name, Type),
|
||||||
Config = #{ level => Level,
|
Config = #{
|
||||||
formatter => ?FORMAT,
|
level => Level,
|
||||||
|
formatter => formatter(Who),
|
||||||
filter_default => stop,
|
filter_default => stop,
|
||||||
filters => filters(Who),
|
filters => filters(Who),
|
||||||
config => ?CONFIG(LogFile)
|
config => ?CONFIG(LogFile)
|
||||||
|
@ -176,6 +161,31 @@ filters(#{type := topic, filter := Filter, name := Name}) ->
|
||||||
filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
||||||
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
||||||
|
|
||||||
|
formatter(#{type := Type}) ->
|
||||||
|
{logger_formatter,
|
||||||
|
#{
|
||||||
|
template => template(Type),
|
||||||
|
single_line => false,
|
||||||
|
max_size => unlimited,
|
||||||
|
depth => unlimited
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% Don't log clientid since clientid only supports exact match, all client ids are the same.
|
||||||
|
%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp`
|
||||||
|
%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
|
||||||
|
template(clientid) ->
|
||||||
|
[time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
|
||||||
|
%% TODO better format when clientid is utf8.
|
||||||
|
template(_) ->
|
||||||
|
[time, " [", level, "] ",
|
||||||
|
{clientid,
|
||||||
|
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
|
||||||
|
[{peername, [peername, " "], []}]
|
||||||
|
},
|
||||||
|
msg, "\n"
|
||||||
|
].
|
||||||
|
|
||||||
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
||||||
Init = #{id => Id, level => Level, dst => Dst},
|
Init = #{id => Id, level => Level, dst => Dst},
|
||||||
case Filters of
|
case Filters of
|
||||||
|
@ -209,7 +219,7 @@ do_handler_id(Name, Type) ->
|
||||||
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
|
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
|
||||||
ensure_bin(Bin) when is_binary(Bin) -> Bin.
|
ensure_bin(Bin) when is_binary(Bin) -> Bin.
|
||||||
|
|
||||||
ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
ensure_list(Bin) when is_binary(Bin) -> unicode:characters_to_list(Bin, utf8);
|
||||||
ensure_list(List) when is_list(List) -> List.
|
ensure_list(List) when is_list(List) -> List.
|
||||||
|
|
||||||
show_prompts(ok, Who, Msg) ->
|
show_prompts(ok, Who, Msg) ->
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-import(emqx_trace_handler_SUITE, [filesync/2]).
|
|
||||||
-record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}).
|
-record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -33,15 +33,22 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:load(emqx_plugin_libs),
|
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([]).
|
emqx_common_test_helpers:stop_apps([]).
|
||||||
|
|
||||||
t_base_create_delete(_Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
load(),
|
||||||
ok = emqx_trace:clear(),
|
ok = emqx_trace:clear(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_) ->
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_base_create_delete(_Config) ->
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
End = to_rfc3339(Now + 30 * 60),
|
End = to_rfc3339(Now + 30 * 60),
|
||||||
|
@ -49,7 +56,7 @@ t_base_create_delete(_Config) ->
|
||||||
ClientId = <<"test-device">>,
|
ClientId = <<"test-device">>,
|
||||||
Trace = #{
|
Trace = #{
|
||||||
name => Name,
|
name => Name,
|
||||||
type => <<"clientid">>,
|
type => clientid,
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
start_at => Start,
|
start_at => Start,
|
||||||
end_at => End
|
end_at => End
|
||||||
|
@ -84,15 +91,14 @@ t_base_create_delete(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_size_max(_Config) ->
|
t_create_size_max(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
lists:map(fun(Seq) ->
|
lists:map(fun(Seq) ->
|
||||||
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||||
Trace = [{name, Name}, {type, <<"topic">>},
|
Trace = [{name, Name}, {type, topic},
|
||||||
{topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
{topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||||
ok = emqx_trace:create(Trace)
|
ok = emqx_trace:create(Trace)
|
||||||
end, lists:seq(1, 30)),
|
end, lists:seq(1, 30)),
|
||||||
Trace31 = [{<<"name">>, <<"name31">>},
|
Trace31 = [{<<"name">>, <<"name31">>},
|
||||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}],
|
{<<"type">>, topic}, {<<"topic">>, <<"/x/y/31">>}],
|
||||||
{error, _} = emqx_trace:create(Trace31),
|
{error, _} = emqx_trace:create(Trace31),
|
||||||
ok = emqx_trace:delete(<<"name30">>),
|
ok = emqx_trace:delete(<<"name30">>),
|
||||||
ok = emqx_trace:create(Trace31),
|
ok = emqx_trace:create(Trace31),
|
||||||
|
@ -100,54 +106,52 @@ t_create_size_max(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_failed(_Config) ->
|
t_create_failed(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
Name = {<<"name">>, <<"test">>},
|
||||||
UnknownField = [{<<"unknown">>, 12}],
|
UnknownField = [Name, {<<"unknown">>, 12}],
|
||||||
{error, Reason1} = emqx_trace:create(UnknownField),
|
{error, Reason1} = emqx_trace:create(UnknownField),
|
||||||
?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
|
?assertEqual(<<"type=[topic,clientid,ip_address] required">>, iolist_to_binary(Reason1)),
|
||||||
|
|
||||||
InvalidTopic = [{<<"topic">>, "#/#//"}],
|
InvalidTopic = [Name, {<<"topic">>, "#/#//"}, {<<"type">>, topic}],
|
||||||
{error, Reason2} = emqx_trace:create(InvalidTopic),
|
{error, Reason2} = emqx_trace:create(InvalidTopic),
|
||||||
?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||||
|
|
||||||
InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
|
InvalidStart = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>},
|
||||||
|
{<<"start_at">>, <<"2021-12-3:12">>}],
|
||||||
{error, Reason3} = emqx_trace:create(InvalidStart),
|
{error, Reason3} = emqx_trace:create(InvalidStart),
|
||||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||||
iolist_to_binary(Reason3)),
|
iolist_to_binary(Reason3)),
|
||||||
|
|
||||||
InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}],
|
InvalidEnd = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>},
|
||||||
|
{<<"end_at">>, <<"2021-12-3:12">>}],
|
||||||
{error, Reason4} = emqx_trace:create(InvalidEnd),
|
{error, Reason4} = emqx_trace:create(InvalidEnd),
|
||||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||||
iolist_to_binary(Reason4)),
|
iolist_to_binary(Reason4)),
|
||||||
|
|
||||||
{error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
|
{error, Reason7} = emqx_trace:create([Name, {<<"type">>, clientid}]),
|
||||||
?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)),
|
?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason7)),
|
||||||
|
|
||||||
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
|
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
|
||||||
{<<"type">>, <<"clientid">>}],
|
{<<"type">>, clientid}],
|
||||||
{error, Reason9} = emqx_trace:create(InvalidPackets4),
|
{error, Reason9} = emqx_trace:create(InvalidPackets4),
|
||||||
?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
|
?assertEqual(<<"Name should be ^[A-Za-z]+[A-Za-z0-9-_]*$">>, iolist_to_binary(Reason9)),
|
||||||
|
|
||||||
?assertEqual({error, "type=[topic,clientid,ip_address] required"},
|
?assertEqual({error, "type=[topic,clientid,ip_address] required"},
|
||||||
emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
|
emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
|
||||||
|
|
||||||
?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"},
|
|
||||||
emqx_trace:create([{<<"name">>, <<"test-name">>},
|
|
||||||
{<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
|
||||||
|
|
||||||
?assertEqual({error, "ip address: einval"},
|
?assertEqual({error, "ip address: einval"},
|
||||||
emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])),
|
emqx_trace:create([Name, {<<"type">>, ip_address},
|
||||||
|
{<<"ip_address">>, <<"test-name">>}])),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_default(_Config) ->
|
t_create_default(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
{error, "name required"} = emqx_trace:create([]),
|
{error, "name required"} = emqx_trace:create([]),
|
||||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
|
{<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]),
|
||||||
[#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
|
[#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
|
||||||
ok = emqx_trace:clear(),
|
ok = emqx_trace:clear(),
|
||||||
Trace = [
|
Trace = [
|
||||||
{<<"name">>, <<"test-name">>},
|
{<<"name">>, <<"test-name">>},
|
||||||
{<<"type">>, <<"topic">>},
|
{<<"type">>, topic},
|
||||||
{<<"topic">>, <<"/x/y/z">>},
|
{<<"topic">>, <<"/x/y/z">>},
|
||||||
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
||||||
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
|
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
|
||||||
|
@ -156,25 +160,38 @@ t_create_default(_Config) ->
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Trace2 = [
|
Trace2 = [
|
||||||
{<<"name">>, <<"test-name">>},
|
{<<"name">>, <<"test-name">>},
|
||||||
{<<"type">>, <<"topic">>},
|
{<<"type">>, topic},
|
||||||
{<<"topic">>, <<"/x/y/z">>},
|
{<<"topic">>, <<"/x/y/z">>},
|
||||||
{<<"start_at">>, to_rfc3339(Now + 10)},
|
{<<"start_at">>, to_rfc3339(Now + 10)},
|
||||||
{<<"end_at">>, to_rfc3339(Now + 3)}
|
{<<"end_at">>, to_rfc3339(Now + 3)}
|
||||||
],
|
],
|
||||||
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
|
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
|
||||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]),
|
{<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||||
[#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
|
[#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
|
||||||
?assertEqual(10 * 60, End - Start),
|
?assertEqual(10 * 60, End - Start),
|
||||||
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_update_enable(_Config) ->
|
t_create_with_extra_fields(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
ok = emqx_trace:clear(),
|
||||||
|
Trace = [
|
||||||
|
{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"type">>, topic},
|
||||||
|
{<<"topic">>, <<"/x/y/z">>},
|
||||||
|
{<<"clientid">>, <<"dev001">>},
|
||||||
|
{<<"ip_address">>, <<"127.0.0.1">>}
|
||||||
|
],
|
||||||
|
ok = emqx_trace:create(Trace),
|
||||||
|
?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}],
|
||||||
|
emqx_trace:list()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_update_enable(_Config) ->
|
||||||
Name = <<"test-name">>,
|
Name = <<"test-name">>,
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||||
ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic},
|
||||||
{<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
{<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||||
[#emqx_trace{enable = Enable}] = emqx_trace:list(),
|
[#emqx_trace{enable = Enable}] = emqx_trace:list(),
|
||||||
?assertEqual(Enable, true),
|
?assertEqual(Enable, true),
|
||||||
|
@ -192,16 +209,14 @@ t_update_enable(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_load_state(_Config) ->
|
t_load_state(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
Running = #{name => <<"Running">>, type => topic,
|
||||||
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1),
|
||||||
{<<"end_at">>, to_rfc3339(Now + 2)}],
|
end_at => to_rfc3339(Now + 2)},
|
||||||
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
|
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, topic},
|
||||||
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
|
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
|
||||||
{<<"end_at">>, to_rfc3339(Now + 8)}],
|
{<<"end_at">>, to_rfc3339(Now + 8)}],
|
||||||
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>},
|
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic},
|
||||||
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
|
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
|
||||||
{<<"end_at">>, to_rfc3339(Now)}],
|
{<<"end_at">>, to_rfc3339(Now)}],
|
||||||
ok = emqx_trace:create(Running),
|
ok = emqx_trace:create(Running),
|
||||||
|
@ -218,54 +233,48 @@ t_load_state(_Config) ->
|
||||||
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
||||||
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
||||||
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_client_event(_Config) ->
|
t_client_event(_Config) ->
|
||||||
application:set_env(emqx, allow_anonymous, true),
|
application:set_env(emqx, allow_anonymous, true),
|
||||||
emqx_trace:clear(),
|
|
||||||
ClientId = <<"client-test">>,
|
ClientId = <<"client-test">>,
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
Name = <<"test_client_id_event">>,
|
Name = <<"test_client_id_event">>,
|
||||||
ok = emqx_trace:create([{<<"name">>, Name},
|
ok = emqx_trace:create([{<<"name">>, Name},
|
||||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
{<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||||
ct:sleep(200),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
emqtt:ping(Client),
|
emqtt:ping(Client),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
||||||
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
||||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
{<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
||||||
ok = filesync(Name, clientid),
|
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
|
||||||
ok = filesync(<<"test_topic">>, topic),
|
|
||||||
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
ok = filesync(Name, clientid),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
ok = filesync(<<"test_topic">>, topic),
|
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
|
||||||
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
||||||
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
||||||
?assert(erlang:byte_size(Bin) > 0),
|
?assert(erlang:byte_size(Bin) > 0),
|
||||||
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
||||||
?assert(erlang:byte_size(Bin3) > 0),
|
?assert(erlang:byte_size(Bin3) > 0),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_get_log_filename(_Config) ->
|
t_get_log_filename(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = calendar:system_time_to_rfc3339(Now),
|
Start = calendar:system_time_to_rfc3339(Now),
|
||||||
End = calendar:system_time_to_rfc3339(Now + 2),
|
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||||
Name = <<"name1">>,
|
Name = <<"name1">>,
|
||||||
Trace = [
|
Trace = [
|
||||||
{<<"name">>, Name},
|
{<<"name">>, Name},
|
||||||
{<<"type">>, <<"ip_address">>},
|
{<<"type">>, ip_address},
|
||||||
{<<"ip_address">>, <<"127.0.0.1">>},
|
{<<"ip_address">>, <<"127.0.0.1">>},
|
||||||
{<<"start_at">>, list_to_binary(Start)},
|
{<<"start_at">>, list_to_binary(Start)},
|
||||||
{<<"end_at">>, list_to_binary(End)}
|
{<<"end_at">>, list_to_binary(End)}
|
||||||
|
@ -275,7 +284,6 @@ t_get_log_filename(_Config) ->
|
||||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
ct:sleep(3000),
|
ct:sleep(3000),
|
||||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_trace_file(_Config) ->
|
t_trace_file(_Config) ->
|
||||||
|
@ -291,22 +299,43 @@ t_trace_file(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_download_log(_Config) ->
|
t_download_log(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
ClientId = <<"client-test">>,
|
ClientId = <<"client-test">>,
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
Name = <<"test_client_id">>,
|
Name = <<"test_client_id">>,
|
||||||
ok = emqx_trace:create([{<<"name">>, Name},
|
ok = emqx_trace:create([{<<"name">>, Name},
|
||||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
{<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||||
ok = filesync(Name, clientid),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
|
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
|
||||||
?assert(filelib:file_size(ZipFile) > 0),
|
?assert(filelib:file_size(ZipFile) > 0),
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
unload(),
|
ok.
|
||||||
|
|
||||||
|
t_find_closed_time(_Config) ->
|
||||||
|
DefaultMs = 60 * 15000,
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Traces2 = [],
|
||||||
|
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)),
|
||||||
|
Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1,
|
||||||
|
end_at = Now + 2, enable = false}],
|
||||||
|
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)),
|
||||||
|
Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}],
|
||||||
|
?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)),
|
||||||
|
Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2,
|
||||||
|
end_at = Now + 10, enable = true}],
|
||||||
|
?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)),
|
||||||
|
Traces = [
|
||||||
|
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true},
|
||||||
|
#emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true},
|
||||||
|
#emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true},
|
||||||
|
#emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true},
|
||||||
|
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true},
|
||||||
|
#emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false}
|
||||||
|
],
|
||||||
|
?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
to_rfc3339(Second) ->
|
to_rfc3339(Second) ->
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
{password, <<"pass">>}
|
{password, <<"pass">>}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address].
|
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
|
@ -85,7 +85,6 @@ t_trace_clientid(_Config) ->
|
||||||
emqtt:connect(T),
|
emqtt:connect(T),
|
||||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||||
emqtt:ping(T),
|
emqtt:ping(T),
|
||||||
|
|
||||||
ok = filesync(<<"client">>, clientid),
|
ok = filesync(<<"client">>, clientid),
|
||||||
ok = filesync(<<"client2">>, clientid),
|
ok = filesync(<<"client2">>, clientid),
|
||||||
ok = filesync(<<"client3">>, clientid),
|
ok = filesync(<<"client3">>, clientid),
|
||||||
|
@ -106,6 +105,22 @@ t_trace_clientid(_Config) ->
|
||||||
emqtt:disconnect(T),
|
emqtt:disconnect(T),
|
||||||
?assertEqual([], emqx_trace_handler:running()).
|
?assertEqual([], emqx_trace_handler:running()).
|
||||||
|
|
||||||
|
t_trace_clientid_utf8(_) ->
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
|
|
||||||
|
Utf8Id = <<"client 漢字編碼"/utf8>>,
|
||||||
|
ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
|
||||||
|
{ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
|
||||||
|
emqtt:connect(T),
|
||||||
|
[begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
|
||||||
|
emqtt:ping(T),
|
||||||
|
|
||||||
|
ok = filesync(Utf8Id, clientid),
|
||||||
|
ok = emqx_trace_handler:uninstall(clientid, Utf8Id),
|
||||||
|
emqtt:disconnect(T),
|
||||||
|
?assertEqual([], emqx_trace_handler:running()),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_trace_topic(_Config) ->
|
t_trace_topic(_Config) ->
|
||||||
{ok, T} = emqtt:start_link(?CLIENT),
|
{ok, T} = emqtt:start_link(?CLIENT),
|
||||||
emqtt:connect(T),
|
emqtt:connect(T),
|
||||||
|
@ -161,6 +176,7 @@ t_trace_ip_address(_Config) ->
|
||||||
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
||||||
ok = filesync(<<"127.0.0.1">>, ip_address),
|
ok = filesync(<<"127.0.0.1">>, ip_address),
|
||||||
ok = filesync(<<"192.168.1.1">>, ip_address),
|
ok = filesync(<<"192.168.1.1">>, ip_address),
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
%% Verify the tracing file exits
|
||||||
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
||||||
?assert(filelib:is_regular("tmp/ip_trace_y.log")),
|
?assert(filelib:is_regular("tmp/ip_trace_y.log")),
|
||||||
|
@ -198,6 +214,7 @@ t_trace_ip_address(_Config) ->
|
||||||
emqtt:disconnect(T),
|
emqtt:disconnect(T),
|
||||||
?assertEqual([], emqx_trace_handler:running()).
|
?assertEqual([], emqx_trace_handler:running()).
|
||||||
|
|
||||||
|
|
||||||
filesync(Name, Type) ->
|
filesync(Name, Type) ->
|
||||||
ct:sleep(50),
|
ct:sleep(50),
|
||||||
filesync(Name, Type, 3).
|
filesync(Name, Type, 3).
|
||||||
|
|
|
@ -0,0 +1,408 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_mgmt_api_trace).
|
||||||
|
|
||||||
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-export([ api_spec/0
|
||||||
|
, fields/1
|
||||||
|
, paths/0
|
||||||
|
, schema/1
|
||||||
|
, namespace/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ trace/2
|
||||||
|
, delete_trace/2
|
||||||
|
, update_trace/2
|
||||||
|
, download_trace_log/2
|
||||||
|
, stream_log_file/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([validate_name/1]).
|
||||||
|
|
||||||
|
%% for rpc
|
||||||
|
-export([read_trace_file/3
|
||||||
|
, get_trace_size/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
||||||
|
-define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}).
|
||||||
|
|
||||||
|
namespace() -> "trace".
|
||||||
|
|
||||||
|
api_spec() ->
|
||||||
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
||||||
|
|
||||||
|
paths() ->
|
||||||
|
["/trace", "/trace/:name/stop", "/trace/:name/download", "/trace/:name/log", "/trace/:name"].
|
||||||
|
|
||||||
|
|
||||||
|
schema("/trace") ->
|
||||||
|
#{
|
||||||
|
'operationId' => trace,
|
||||||
|
get => #{
|
||||||
|
description => "List all trace",
|
||||||
|
responses => #{
|
||||||
|
200 => hoconsc:ref(trace)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
post => #{
|
||||||
|
description => "Create new trace",
|
||||||
|
'requestBody' => delete([status, log_size], fields(trace)),
|
||||||
|
responses => #{
|
||||||
|
200 => hoconsc:ref(trace)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
delete => #{
|
||||||
|
description => "Clear all traces",
|
||||||
|
responses => #{
|
||||||
|
204 => <<"No Content">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/trace/:name") ->
|
||||||
|
#{
|
||||||
|
'operationId' => delete_trace,
|
||||||
|
delete => #{
|
||||||
|
description => "Delete trace by name",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Delete successfully">>,
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/trace/:name/stop") ->
|
||||||
|
#{
|
||||||
|
'operationId' => update_trace,
|
||||||
|
put => #{
|
||||||
|
description => "Stop trace by name",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
responses => #{
|
||||||
|
200 => hoconsc:ref(trace),
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/trace/:name/download") ->
|
||||||
|
#{
|
||||||
|
'operationId' => download_trace_log,
|
||||||
|
get => #{
|
||||||
|
description => "Download trace log by name",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
%% todo zip file octet-stream
|
||||||
|
responses => #{
|
||||||
|
200 => <<"TODO octet-stream">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/trace/:name/log") ->
|
||||||
|
#{
|
||||||
|
'operationId' => stream_log_file,
|
||||||
|
get => #{
|
||||||
|
description => "view trace log",
|
||||||
|
parameters => [
|
||||||
|
hoconsc:ref(name),
|
||||||
|
hoconsc:ref(bytes),
|
||||||
|
hoconsc:ref(position),
|
||||||
|
hoconsc:ref(node)
|
||||||
|
],
|
||||||
|
%% todo response data
|
||||||
|
responses => #{
|
||||||
|
200 => <<"TODO">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
fields(trace) ->
|
||||||
|
[
|
||||||
|
{name, hoconsc:mk(binary(),
|
||||||
|
#{desc => "Unique and format by [a-zA-Z0-9-_]",
|
||||||
|
validator => fun ?MODULE:validate_name/1,
|
||||||
|
nullable => false,
|
||||||
|
example => <<"EMQX-TRACE-1">>})},
|
||||||
|
{type, hoconsc:mk(hoconsc:enum([clientid, topic, ip_address]),
|
||||||
|
#{desc => """Filter type""",
|
||||||
|
nullable => false,
|
||||||
|
example => <<"clientid">>})},
|
||||||
|
{topic, hoconsc:mk(binary(),
|
||||||
|
#{desc => """support mqtt wildcard topic.""",
|
||||||
|
nullable => true,
|
||||||
|
example => <<"/dev/#">>})},
|
||||||
|
{clientid, hoconsc:mk(binary(),
|
||||||
|
#{desc => """mqtt clientid.""",
|
||||||
|
nullable => true,
|
||||||
|
example => <<"dev-001">>})},
|
||||||
|
%% TODO add ip_address type in emqx_schema.erl
|
||||||
|
{ip_address, hoconsc:mk(binary(),
|
||||||
|
#{desc => "client ip address",
|
||||||
|
nullable => true,
|
||||||
|
example => <<"127.0.0.1">>
|
||||||
|
})},
|
||||||
|
{status, hoconsc:mk(hoconsc:enum([running, stopped, waiting]),
|
||||||
|
#{desc => "trace status",
|
||||||
|
nullable => true,
|
||||||
|
example => running
|
||||||
|
})},
|
||||||
|
{start_at, hoconsc:mk(binary(),
|
||||||
|
#{desc => "rfc3339 timestamp",
|
||||||
|
nullable => true,
|
||||||
|
example => <<"2021-11-04T18:17:38+08:00">>
|
||||||
|
})},
|
||||||
|
{end_at, hoconsc:mk(binary(),
|
||||||
|
#{desc => "rfc3339 timestamp",
|
||||||
|
nullable => true,
|
||||||
|
example => <<"2021-11-05T18:17:38+08:00">>
|
||||||
|
})},
|
||||||
|
{log_size, hoconsc:mk(hoconsc:array(map()),
|
||||||
|
#{desc => "trace log size",
|
||||||
|
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
|
||||||
|
nullable => true})}
|
||||||
|
];
|
||||||
|
fields(name) ->
|
||||||
|
[{name, hoconsc:mk(binary(),
|
||||||
|
#{
|
||||||
|
desc => <<"[a-zA-Z0-9-_]">>,
|
||||||
|
example => <<"EMQX-TRACE-1">>,
|
||||||
|
in => path,
|
||||||
|
validator => fun ?MODULE:validate_name/1
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(node) ->
|
||||||
|
[{node, hoconsc:mk(binary(),
|
||||||
|
#{
|
||||||
|
desc => "Node name",
|
||||||
|
in => query,
|
||||||
|
nullable => true
|
||||||
|
})}];
|
||||||
|
fields(bytes) ->
|
||||||
|
[{bytes, hoconsc:mk(integer(),
|
||||||
|
#{
|
||||||
|
desc => "Maximum number of bytes to store in request",
|
||||||
|
in => query,
|
||||||
|
nullable => true,
|
||||||
|
default => 1000
|
||||||
|
})}];
|
||||||
|
fields(position) ->
|
||||||
|
[{position, hoconsc:mk(integer(),
|
||||||
|
#{
|
||||||
|
desc => "Offset from the current trace position.",
|
||||||
|
in => query,
|
||||||
|
nullable => true,
|
||||||
|
default => 0
|
||||||
|
})}].
|
||||||
|
|
||||||
|
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
|
||||||
|
|
||||||
|
validate_name(Name) ->
|
||||||
|
NameLen = byte_size(Name),
|
||||||
|
case NameLen > 0 andalso NameLen =< 256 of
|
||||||
|
true ->
|
||||||
|
case re:run(Name, ?NAME_RE) of
|
||||||
|
nomatch -> {error, "Name should be " ?NAME_RE};
|
||||||
|
_ -> ok
|
||||||
|
end;
|
||||||
|
false -> {error, "Name Length must =< 256"}
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete(Keys, Fields) ->
|
||||||
|
lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
|
||||||
|
|
||||||
|
trace(get, _Params) ->
|
||||||
|
case emqx_trace:list() of
|
||||||
|
[] -> {200, []};
|
||||||
|
List0 ->
|
||||||
|
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
|
||||||
|
emqx_trace:format(List0)),
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
|
||||||
|
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Traces =
|
||||||
|
lists:map(fun(Trace = #{name := Name, start_at := Start,
|
||||||
|
end_at := End, enable := Enable, type := Type, filter := Filter}) ->
|
||||||
|
FileName = emqx_trace:filename(Name, Start),
|
||||||
|
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
|
||||||
|
Trace0 = maps:without([enable, filter], Trace),
|
||||||
|
Trace0#{log_size => LogSize
|
||||||
|
, Type => iolist_to_binary(Filter)
|
||||||
|
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
|
||||||
|
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
|
||||||
|
, status => status(Enable, Start, End, Now)
|
||||||
|
}
|
||||||
|
end, List),
|
||||||
|
{200, Traces}
|
||||||
|
end;
|
||||||
|
trace(post, #{body := Param}) ->
|
||||||
|
case emqx_trace:create(Param) of
|
||||||
|
ok -> {200};
|
||||||
|
{error, {already_existed, Name}} ->
|
||||||
|
{400, #{
|
||||||
|
code => 'ALREADY_EXISTED',
|
||||||
|
message => ?TO_BIN([Name, " Already Exists"])
|
||||||
|
}};
|
||||||
|
{error, {duplicate_condition, Name}} ->
|
||||||
|
{400, #{
|
||||||
|
code => 'DUPLICATE_CONDITION',
|
||||||
|
message => ?TO_BIN([Name, " Duplication Condition"])
|
||||||
|
}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{400, #{
|
||||||
|
code => 'INCORRECT_PARAMS',
|
||||||
|
message => ?TO_BIN(Reason)
|
||||||
|
}}
|
||||||
|
end;
|
||||||
|
trace(delete, _Param) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
{200}.
|
||||||
|
|
||||||
|
delete_trace(delete, #{bindings := #{name := Name}}) ->
|
||||||
|
case emqx_trace:delete(Name) of
|
||||||
|
ok -> {200};
|
||||||
|
{error, not_found} -> ?NOT_FOUND(Name)
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_trace(put, #{bindings := #{name := Name}}) ->
|
||||||
|
case emqx_trace:update(Name, false) of
|
||||||
|
ok -> {200, #{enable => false, name => Name}};
|
||||||
|
{error, not_found} -> ?NOT_FOUND(Name)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
||||||
|
%% cowboy_compress_h will auto encode gzip format.
|
||||||
|
download_trace_log(get, #{bindings := #{name := Name}}) ->
|
||||||
|
case emqx_trace:get_trace_filename(Name) of
|
||||||
|
{ok, TraceLog} ->
|
||||||
|
TraceFiles = collect_trace_file(TraceLog),
|
||||||
|
ZipDir = emqx_trace:zip_dir(),
|
||||||
|
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
||||||
|
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
|
||||||
|
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
|
||||||
|
emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
||||||
|
{200, ZipFile};
|
||||||
|
{error, not_found} -> ?NOT_FOUND(Name)
|
||||||
|
end.
|
||||||
|
|
||||||
|
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
||||||
|
lists:foldl(fun(Res, Acc) ->
|
||||||
|
case Res of
|
||||||
|
{ok, Node, Bin} ->
|
||||||
|
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
||||||
|
case file:write_file(ZipName, Bin) of
|
||||||
|
ok -> [Node ++ "-" ++ TraceLog | Acc];
|
||||||
|
_ -> Acc
|
||||||
|
end;
|
||||||
|
{error, Node, Reason} ->
|
||||||
|
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], TraceFiles).
|
||||||
|
|
||||||
|
collect_trace_file(TraceLog) ->
|
||||||
|
cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
|
||||||
|
|
||||||
|
cluster_call(Mod, Fun, Args, Timeout) ->
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
||||||
|
BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
|
||||||
|
GoodRes.
|
||||||
|
|
||||||
|
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query} = T) ->
|
||||||
|
Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())),
|
||||||
|
Position = maps:get(<<"position">>, Query, 0),
|
||||||
|
Bytes = maps:get(<<"bytes">>, Query, 1000),
|
||||||
|
logger:error("~p", [T]),
|
||||||
|
case to_node(Node0) of
|
||||||
|
{ok, Node} ->
|
||||||
|
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
||||||
|
{200, #{meta => Meta, items => Bin}};
|
||||||
|
{eof, Size} ->
|
||||||
|
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
||||||
|
{200, #{meta => Meta, items => <<"">>}};
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]),
|
||||||
|
{400, #{code => 'READ_FILE_ERROR', message => Reason}};
|
||||||
|
{badrpc, nodedown} ->
|
||||||
|
{400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}
|
||||||
|
end;
|
||||||
|
{error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_trace_size() ->
|
||||||
|
TraceDir = emqx_trace:trace_dir(),
|
||||||
|
Node = node(),
|
||||||
|
case file:list_dir(TraceDir) of
|
||||||
|
{ok, AllFiles} ->
|
||||||
|
lists:foldl(fun(File, Acc) ->
|
||||||
|
FullFileName = filename:join(TraceDir, File),
|
||||||
|
Acc#{{Node, File} => filelib:file_size(FullFileName)}
|
||||||
|
end, #{}, lists:delete("zip", AllFiles));
|
||||||
|
_ -> #{}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% this is an rpc call for stream_log_file/2
|
||||||
|
read_trace_file(Name, Position, Limit) ->
|
||||||
|
case emqx_trace:get_trace_filename(Name) of
|
||||||
|
{error, _} = Error -> Error;
|
||||||
|
{ok, TraceFile} ->
|
||||||
|
TraceDir = emqx_trace:trace_dir(),
|
||||||
|
TracePath = filename:join([TraceDir, TraceFile]),
|
||||||
|
read_file(TracePath, Position, Limit)
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_file(Path, Offset, Bytes) ->
|
||||||
|
case file:open(Path, [read, raw, binary]) of
|
||||||
|
{ok, IoDevice} ->
|
||||||
|
try
|
||||||
|
_ = case Offset of
|
||||||
|
0 -> ok;
|
||||||
|
_ -> file:position(IoDevice, {bof, Offset})
|
||||||
|
end,
|
||||||
|
case file:read(IoDevice, Bytes) of
|
||||||
|
{ok, Bin} -> {ok, Bin};
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
eof ->
|
||||||
|
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
||||||
|
{eof, Size}
|
||||||
|
end
|
||||||
|
after
|
||||||
|
file:close(IoDevice)
|
||||||
|
end;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_node(Node) ->
|
||||||
|
try {ok, binary_to_existing_atom(Node)}
|
||||||
|
catch _:_ ->
|
||||||
|
{error, not_found}
|
||||||
|
end.
|
||||||
|
|
||||||
|
collect_file_size(Nodes, FileName, AllFiles) ->
|
||||||
|
lists:foldl(fun(Node, Acc) ->
|
||||||
|
Size = maps:get({Node, FileName}, AllFiles, 0),
|
||||||
|
Acc#{Node => Size}
|
||||||
|
end, #{}, Nodes).
|
||||||
|
|
||||||
|
status(false, _Start, _End, _Now) -> <<"stopped">>;
|
||||||
|
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
|
||||||
|
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
|
||||||
|
status(true, _Start, _End, _Now) -> <<"running">>.
|
|
@ -0,0 +1,190 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_mgmt_trace_api_SUITE).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
-define(HOST, "http://127.0.0.1:18083/").
|
||||||
|
-define(API_VERSION, "v5").
|
||||||
|
-define(BASE_PATH, "api").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_) ->
|
||||||
|
emqx_mgmt_api_test_util:end_suite().
|
||||||
|
|
||||||
|
t_http_test(_Config) ->
|
||||||
|
emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
Header = auth_header_(),
|
||||||
|
%% list
|
||||||
|
{ok, Empty} = request_api(get, api_path("trace"), Header),
|
||||||
|
?assertEqual([], json(Empty)),
|
||||||
|
%% create
|
||||||
|
ErrorTrace = #{},
|
||||||
|
{error, {"HTTP/1.1", 400, "Bad Request"}, Body} =
|
||||||
|
request_api(post, api_path("trace"), Header, ErrorTrace),
|
||||||
|
?assertEqual(
|
||||||
|
#{
|
||||||
|
<<"code">> => <<"BAD_REQUEST">>,
|
||||||
|
<<"message">> => <<"name : not_nullable">>
|
||||||
|
}, json(Body)),
|
||||||
|
|
||||||
|
Name = <<"test-name">>,
|
||||||
|
Trace = [
|
||||||
|
{<<"name">>, Name},
|
||||||
|
{<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/z">>}
|
||||||
|
],
|
||||||
|
|
||||||
|
{ok, Create} = request_api(post, api_path("trace"), Header, Trace),
|
||||||
|
?assertEqual(<<>>, Create),
|
||||||
|
|
||||||
|
{ok, List} = request_api(get, api_path("trace"), Header),
|
||||||
|
[Data] = json(List),
|
||||||
|
?assertEqual(Name, maps:get(<<"name">>, Data)),
|
||||||
|
|
||||||
|
%% update
|
||||||
|
{ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}),
|
||||||
|
?assertEqual(#{<<"enable">> => false,
|
||||||
|
<<"name">> => <<"test-name">>}, json(Update)),
|
||||||
|
|
||||||
|
{ok, List1} = request_api(get, api_path("trace"), Header),
|
||||||
|
[Data1] = json(List1),
|
||||||
|
Node = atom_to_binary(node()),
|
||||||
|
?assertMatch(#{
|
||||||
|
<<"status">> := <<"stopped">>,
|
||||||
|
<<"name">> := <<"test-name">>,
|
||||||
|
<<"log_size">> := #{Node := _},
|
||||||
|
<<"start_at">> := _,
|
||||||
|
<<"end_at">> := _,
|
||||||
|
<<"type">> := <<"topic">>,
|
||||||
|
<<"topic">> := <<"/x/y/z">>
|
||||||
|
}, Data1),
|
||||||
|
|
||||||
|
%% delete
|
||||||
|
{ok, Delete} = request_api(delete, api_path("trace/test-name"), Header),
|
||||||
|
?assertEqual(<<>>, Delete),
|
||||||
|
|
||||||
|
{error, {"HTTP/1.1", 404, "Not Found"}, DeleteNotFound}
|
||||||
|
= request_api(delete, api_path("trace/test-name"), Header),
|
||||||
|
?assertEqual(#{<<"code">> => <<"NOT_FOUND">>,
|
||||||
|
<<"message">> => <<"test-name NOT FOUND">>}, json(DeleteNotFound)),
|
||||||
|
|
||||||
|
{ok, List2} = request_api(get, api_path("trace"), Header),
|
||||||
|
?assertEqual([], json(List2)),
|
||||||
|
|
||||||
|
%% clear
|
||||||
|
{ok, Create1} = request_api(post, api_path("trace"), Header, Trace),
|
||||||
|
?assertEqual(<<>>, Create1),
|
||||||
|
|
||||||
|
{ok, Clear} = request_api(delete, api_path("trace"), Header),
|
||||||
|
?assertEqual(<<>>, Clear),
|
||||||
|
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_stream_log(_Config) ->
|
||||||
|
application:set_env(emqx, allow_anonymous, true),
|
||||||
|
emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
ClientId = <<"client-stream">>,
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Name = <<"test_stream_log">>,
|
||||||
|
Start = to_rfc3339(Now - 10),
|
||||||
|
ok = emqx_trace:create(#{<<"name">> => Name,
|
||||||
|
<<"type">> => clientid, <<"clientid">> => ClientId, <<"start_at">> => Start}),
|
||||||
|
ct:sleep(200),
|
||||||
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
[begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)],
|
||||||
|
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]),
|
||||||
|
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]),
|
||||||
|
ok = emqtt:disconnect(Client),
|
||||||
|
ct:sleep(200),
|
||||||
|
File = emqx_trace:log_file(Name, Now),
|
||||||
|
ct:pal("FileName: ~p", [File]),
|
||||||
|
{ok, FileBin} = file:read_file(File),
|
||||||
|
ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
|
||||||
|
Header = auth_header_(),
|
||||||
|
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header),
|
||||||
|
#{<<"meta">> := Meta, <<"items">> := Bin} = json(Binary),
|
||||||
|
?assertEqual(10, byte_size(Bin)),
|
||||||
|
?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta),
|
||||||
|
Path = api_path("trace/test_stream_log/log?position=20&bytes=10"),
|
||||||
|
{ok, Binary1} = request_api(get, Path, Header),
|
||||||
|
#{<<"meta">> := Meta1, <<"items">> := Bin1} = json(Binary1),
|
||||||
|
?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1),
|
||||||
|
?assertEqual(10, byte_size(Bin1)),
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
to_rfc3339(Second) ->
|
||||||
|
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||||
|
|
||||||
|
auth_header_() ->
|
||||||
|
auth_header_("admin", "public").
|
||||||
|
|
||||||
|
auth_header_(User, Pass) ->
|
||||||
|
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
|
||||||
|
{"Authorization", "Basic " ++ Encoded}.
|
||||||
|
|
||||||
|
request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}).
|
||||||
|
|
||||||
|
request_api(Method, Url, Auth, Body) ->
|
||||||
|
Request = {Url, [Auth], "application/json", emqx_json:encode(Body)},
|
||||||
|
do_request_api(Method, Request).
|
||||||
|
|
||||||
|
do_request_api(Method, Request) ->
|
||||||
|
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||||
|
case httpc:request(Method, Request, [], [{body_format, binary}]) of
|
||||||
|
{error, socket_closed_remotely} ->
|
||||||
|
{error, socket_closed_remotely};
|
||||||
|
{error, {shutdown, server_closed}} ->
|
||||||
|
{error, server_closed};
|
||||||
|
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return}}
|
||||||
|
when Code =:= 200 orelse Code =:= 201 ->
|
||||||
|
{ok, Return};
|
||||||
|
{ok, {Reason, _Header, Body}} ->
|
||||||
|
{error, Reason, Body}
|
||||||
|
end.
|
||||||
|
|
||||||
|
api_path(Path) ->
|
||||||
|
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]).
|
||||||
|
|
||||||
|
json(Data) ->
|
||||||
|
{ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx.
|
||||||
|
|
||||||
|
load() ->
|
||||||
|
emqx_trace:start_link().
|
||||||
|
|
||||||
|
unload() ->
|
||||||
|
gen_server:stop(emqx_trace).
|
Loading…
Reference in New Issue