feat(trace): move common trace module to plugin_libs (#6127)
* feat(trace): move common mod to plugin_libs * fix: elvis warning
This commit is contained in:
parent
9d4f2916c2
commit
7193cd4275
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_plugin_libs,
|
||||
[{description, "EMQ X Plugin utility libs"},
|
||||
{vsn, "4.3.2"},
|
||||
{vsn, "4.4.0"},
|
||||
{modules, []},
|
||||
{applications, [kernel,stdlib]},
|
||||
{env, []}
|
||||
|
|
|
@ -0,0 +1,487 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_trace).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-logger_header("[Trace]").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
-export([ start_link/0
|
||||
, list/0
|
||||
, list/1
|
||||
, get_trace_filename/1
|
||||
, create/1
|
||||
, delete/1
|
||||
, clear/0
|
||||
, update/2
|
||||
]).
|
||||
|
||||
-export([ format/1
|
||||
, zip_dir/0
|
||||
, trace_dir/0
|
||||
, trace_file/1
|
||||
, delete_files_after_send/2
|
||||
]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(TRACE, ?MODULE).
|
||||
-define(PACKETS, tuple_to_list(?TYPE_NAMES)).
|
||||
-define(MAX_SIZE, 30).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([log_file/2]).
|
||||
-endif.
|
||||
|
||||
-record(?TRACE,
|
||||
{ name :: binary() | undefined | '_'
|
||||
, type :: clientid | topic | undefined | '_'
|
||||
, topic :: emqx_types:topic() | undefined | '_'
|
||||
, clientid :: emqx_types:clientid() | undefined | '_'
|
||||
, packets = [] :: list() | '_'
|
||||
, enable = true :: boolean() | '_'
|
||||
, start_at :: integer() | undefined | binary() | '_'
|
||||
, end_at :: integer() | undefined | binary() | '_'
|
||||
, log_size = #{} :: map() | '_'
|
||||
}).
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = ekka_mnesia:create_table(?TRACE, [
|
||||
{type, set},
|
||||
{disc_copies, [node()]},
|
||||
{record_name, ?TRACE},
|
||||
{attributes, record_info(fields, ?TRACE)}]);
|
||||
mnesia(copy) ->
|
||||
ok = ekka_mnesia:copy_table(?TRACE, disc_copies).
|
||||
|
||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec list() -> [tuple()].
|
||||
list() ->
|
||||
ets:match_object(?TRACE, #?TRACE{_ = '_'}).
|
||||
|
||||
-spec list(boolean()) -> [tuple()].
|
||||
list(Enable) ->
|
||||
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
|
||||
|
||||
-spec create([{Key :: binary(), Value :: binary()}]) ->
|
||||
ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
|
||||
create(Trace) ->
|
||||
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
||||
true ->
|
||||
case to_trace(Trace) of
|
||||
{ok, TraceRec} -> create_new_trace(TraceRec);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
false ->
|
||||
{error, """The number of traces created has reached the maximum,
|
||||
please delete the useless ones first"""}
|
||||
end.
|
||||
|
||||
-spec delete(Name :: binary()) -> ok | {error, not_found}.
|
||||
delete(Name) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name) of
|
||||
[_] -> mnesia:delete(?TRACE, Name, write);
|
||||
[] -> mnesia:abort(not_found)
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec clear() -> ok | {error, Reason :: term()}.
|
||||
clear() ->
|
||||
case mnesia:clear_table(?TRACE) of
|
||||
{atomic, ok} -> ok;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
-spec update(Name :: binary(), Enable :: boolean()) ->
|
||||
ok | {error, not_found | finished}.
|
||||
update(Name, Enable) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name) of
|
||||
[] -> mnesia:abort(not_found);
|
||||
[#?TRACE{enable = Enable}] -> ok;
|
||||
[Rec] ->
|
||||
case erlang:system_time(second) >= Rec#?TRACE.end_at of
|
||||
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
|
||||
true -> mnesia:abort(finished)
|
||||
end
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec get_trace_filename(Name :: binary()) ->
|
||||
{ok, FileName :: string()} | {error, not_found}.
|
||||
get_trace_filename(Name) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name, read) of
|
||||
[] -> mnesia:abort(not_found);
|
||||
[#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
|
||||
end end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec trace_file(File :: list()) ->
|
||||
{ok, Node :: list(), Binary :: binary()} |
|
||||
{error, Node :: list(), Reason :: term()}.
|
||||
trace_file(File) ->
|
||||
FileName = filename:join(trace_dir(), File),
|
||||
Node = atom_to_list(node()),
|
||||
case file:read_file(FileName) of
|
||||
{ok, Bin} -> {ok, Node, Bin};
|
||||
{error, Reason} -> {error, Node, Reason}
|
||||
end.
|
||||
|
||||
delete_files_after_send(TraceLog, Zips) ->
|
||||
gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}).
|
||||
|
||||
-spec format(list(#?TRACE{})) -> list(map()).
|
||||
format(Traces) ->
|
||||
Fields = record_info(fields, ?TRACE),
|
||||
lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) ->
|
||||
Trace = Trace0#?TRACE{
|
||||
start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)),
|
||||
end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt))
|
||||
},
|
||||
[_ | Values] = tuple_to_list(Trace),
|
||||
maps:from_list(lists:zip(Fields, Values))
|
||||
end, Traces).
|
||||
|
||||
init([]) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
||||
ok = filelib:ensure_dir(trace_dir()),
|
||||
ok = filelib:ensure_dir(zip_dir()),
|
||||
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
||||
Traces = get_enable_trace(),
|
||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||
TRef = update_trace(Traces),
|
||||
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) ->
|
||||
erlang:monitor(process, Pid),
|
||||
{noreply, State#{monitors => Monitors#{Pid => Files}}};
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
|
||||
case maps:take(Pid, Monitors) of
|
||||
error -> {noreply, State};
|
||||
{Files, NewMonitors} ->
|
||||
lists:foreach(fun(F) -> file:delete(F) end, Files),
|
||||
{noreply, State#{monitors => NewMonitors}}
|
||||
end;
|
||||
handle_info({timeout, TRef, update_trace},
|
||||
#{timer := TRef, primary_log_level := OriginLogLevel} = State) ->
|
||||
Traces = get_enable_trace(),
|
||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||
NextTRef = update_trace(Traces),
|
||||
{noreply, State#{timer => NextTRef}};
|
||||
|
||||
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
|
||||
emqx_misc:cancel_timer(TRef),
|
||||
handle_info({timeout, TRef, update_trace}, State);
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
|
||||
ok = set_log_primary_level(OriginLogLevel),
|
||||
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
|
||||
emqx_misc:cancel_timer(TRef),
|
||||
stop_all_trace_handler(),
|
||||
_ = file:del_dir_r(zip_dir()),
|
||||
ok.
|
||||
|
||||
code_change(_, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
create_new_trace(Trace) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Trace#?TRACE.name) of
|
||||
[] ->
|
||||
#?TRACE{start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets} = Trace,
|
||||
Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets},
|
||||
case mnesia:match_object(?TRACE, Match, read) of
|
||||
[] -> mnesia:write(?TRACE, Trace, write);
|
||||
[#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
|
||||
end;
|
||||
[#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name})
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
update_trace(Traces) ->
|
||||
Now = erlang:system_time(second),
|
||||
{_Waiting, Running, Finished} = classify_by_time(Traces, Now),
|
||||
disable_finished(Finished),
|
||||
Started = already_running(),
|
||||
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
||||
NeedStop = AllStarted -- NeedRunning,
|
||||
ok = stop_trace(NeedStop, Started),
|
||||
clean_stale_trace_files(),
|
||||
NextTime = find_closest_time(Traces, Now),
|
||||
emqx_misc:start_timer(NextTime, update_trace).
|
||||
|
||||
stop_all_trace_handler() ->
|
||||
lists:foreach(fun(#{type := Type, name := Name} = Trace) ->
|
||||
_ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name)
|
||||
end
|
||||
, already_running()).
|
||||
|
||||
already_running() ->
|
||||
emqx_tracer:lookup_traces().
|
||||
|
||||
get_enable_trace() ->
|
||||
{atomic, Traces} =
|
||||
mnesia:transaction(fun() ->
|
||||
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
||||
end),
|
||||
Traces.
|
||||
|
||||
find_closest_time(Traces, Now) ->
|
||||
Sec =
|
||||
lists:foldl(
|
||||
fun(#?TRACE{start_at = Start, end_at = End}, Closest)
|
||||
when Start >= Now andalso Now < End -> %% running
|
||||
min(End - Now, Closest);
|
||||
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
|
||||
min(Now - Start, Closest);
|
||||
(_, Closest) -> Closest %% finished
|
||||
end, 60 * 15, Traces),
|
||||
timer:seconds(Sec).
|
||||
|
||||
disable_finished([]) -> ok;
|
||||
disable_finished(Traces) ->
|
||||
NameWithLogSize =
|
||||
lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) ->
|
||||
FileSize = filelib:file_size(log_file(Name, StartAt)),
|
||||
{Name, FileSize} end, Traces),
|
||||
transaction(fun() ->
|
||||
lists:map(fun({Name, LogSize}) ->
|
||||
case mnesia:read(?TRACE, Name, write) of
|
||||
[] -> ok;
|
||||
[Trace = #?TRACE{log_size = Logs}] ->
|
||||
mnesia:write(?TRACE, Trace#?TRACE{enable = false,
|
||||
log_size = Logs#{node() => LogSize}}, write)
|
||||
end end, NameWithLogSize)
|
||||
end).
|
||||
|
||||
start_trace(Traces, Started0) ->
|
||||
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
|
||||
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
|
||||
case lists:member(Name, StartedAcc) of
|
||||
true -> {[Name | Running], StartedAcc};
|
||||
false ->
|
||||
case start_trace(Trace) of
|
||||
ok -> {[Name | Running], [Name | StartedAcc]};
|
||||
Error ->
|
||||
?LOG(error, "(~p)start trace failed by:~p", [Name, Error]),
|
||||
{[Name | Running], StartedAcc}
|
||||
end
|
||||
end
|
||||
end, {[], Started}, Traces).
|
||||
|
||||
start_trace(Trace) ->
|
||||
#?TRACE{name = Name
|
||||
, type = Type
|
||||
, clientid = ClientId
|
||||
, topic = Topic
|
||||
, packets = Packets
|
||||
, start_at = Start
|
||||
} = Trace,
|
||||
Who0 = #{name => Name, labels => Packets},
|
||||
Who =
|
||||
case Type of
|
||||
topic -> Who0#{type => topic, topic => Topic};
|
||||
clientid -> Who0#{type => clientid, clientid => ClientId}
|
||||
end,
|
||||
case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of
|
||||
ok -> ok;
|
||||
{error, {already_exist, _}} -> ok;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
stop_trace(Finished, Started) ->
|
||||
lists:foreach(fun(#{name := Name, type := Type} = Trace) ->
|
||||
case lists:member(Name, Finished) of
|
||||
true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name);
|
||||
false -> ok
|
||||
end
|
||||
end, Started).
|
||||
|
||||
clean_stale_trace_files() ->
|
||||
TraceDir = trace_dir(),
|
||||
case file:list_dir(TraceDir) of
|
||||
{ok, AllFiles} when AllFiles =/= ["zip"] ->
|
||||
FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end,
|
||||
KeepFiles = lists:map(FileFun, list()),
|
||||
case AllFiles -- ["zip" | KeepFiles] of
|
||||
[] -> ok;
|
||||
DeleteFiles ->
|
||||
DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end,
|
||||
lists:foreach(DelFun, DeleteFiles)
|
||||
end;
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
classify_by_time(Traces, Now) ->
|
||||
classify_by_time(Traces, Now, [], [], []).
|
||||
|
||||
classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish};
|
||||
classify_by_time([Trace = #?TRACE{start_at = Start} | Traces],
|
||||
Now, Wait, Run, Finish) when Start > Now ->
|
||||
classify_by_time(Traces, Now, [Trace | Wait], Run, Finish);
|
||||
classify_by_time([Trace = #?TRACE{end_at = End} | Traces],
|
||||
Now, Wait, Run, Finish) when End =< Now ->
|
||||
classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]);
|
||||
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
|
||||
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
||||
|
||||
to_trace(TraceList) ->
|
||||
case to_trace(TraceList, #?TRACE{}) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
{ok, #?TRACE{name = undefined}} ->
|
||||
{error, "name required"};
|
||||
{ok, #?TRACE{type = undefined}} ->
|
||||
{error, "type required"};
|
||||
{ok, #?TRACE{topic = undefined, clientid = undefined}} ->
|
||||
{error, "topic/clientid cannot be both empty"};
|
||||
{ok, Trace} ->
|
||||
case fill_default(Trace) of
|
||||
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
||||
{error, "failed by start_at >= end_at"};
|
||||
Trace1 -> {ok, Trace1}
|
||||
end
|
||||
end.
|
||||
|
||||
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
||||
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
||||
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
||||
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
|
||||
fill_default(Trace) -> Trace.
|
||||
|
||||
to_trace([], Rec) -> {ok, Rec};
|
||||
to_trace([{<<"name">>, Name} | Trace], Rec) ->
|
||||
case binary:match(Name, [<<"/">>], []) of
|
||||
nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
|
||||
_ -> {error, "name cannot contain /"}
|
||||
end;
|
||||
to_trace([{<<"type">>, Type} | Trace], Rec) ->
|
||||
case lists:member(Type, [<<"clientid">>, <<"topic">>]) of
|
||||
true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
|
||||
false -> {error, "incorrect type: only support clientid/topic"}
|
||||
end;
|
||||
to_trace([{<<"topic">>, Topic} | Trace], Rec) ->
|
||||
case validate_topic(Topic) of
|
||||
ok -> to_trace(Trace, Rec#?TRACE{topic = Topic});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"start_at">>, StartAt} | Trace], Rec) ->
|
||||
case to_system_second(StartAt) of
|
||||
{ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"end_at">>, EndAt} | Trace], Rec) ->
|
||||
Now = erlang:system_time(second),
|
||||
case to_system_second(EndAt) of
|
||||
{ok, Sec} when Sec > Now ->
|
||||
to_trace(Trace, Rec#?TRACE{end_at = Sec});
|
||||
{ok, _Sec} ->
|
||||
{error, "end_at time has already passed"};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
to_trace([{<<"clientid">>, ClientId} | Trace], Rec) ->
|
||||
to_trace(Trace, Rec#?TRACE{clientid = ClientId});
|
||||
to_trace([{<<"packets">>, PacketList} | Trace], Rec) ->
|
||||
case to_packets(PacketList) of
|
||||
{ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets});
|
||||
{error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])}
|
||||
end;
|
||||
to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
|
||||
|
||||
validate_topic(TopicName) ->
|
||||
try emqx_topic:validate(name, TopicName) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error ->
|
||||
{error, io_lib:format("~s invalid by ~p", [TopicName, Error])}
|
||||
end.
|
||||
|
||||
to_system_second(At) ->
|
||||
try
|
||||
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
|
||||
{ok, Sec}
|
||||
catch error: {badmatch, _} ->
|
||||
{error, ["The rfc3339 specification not satisfied: ", At]}
|
||||
end.
|
||||
|
||||
to_packets(Packets) when is_list(Packets) ->
|
||||
AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets),
|
||||
case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of
|
||||
[] -> {ok, AtomTypes};
|
||||
InvalidE -> {error, InvalidE}
|
||||
end;
|
||||
to_packets(Packets) -> {error, Packets}.
|
||||
|
||||
zip_dir() ->
|
||||
trace_dir() ++ "zip/".
|
||||
|
||||
trace_dir() ->
|
||||
filename:join(emqx:get_env(data_dir), "trace") ++ "/".
|
||||
|
||||
log_file(Name, Start) ->
|
||||
filename:join(trace_dir(), filename(Name, Start)).
|
||||
|
||||
filename(Name, Start) ->
|
||||
[Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
|
||||
lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
|
||||
|
||||
transaction(Tran) ->
|
||||
case mnesia:transaction(Tran) of
|
||||
{atomic, Res} -> Res;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel);
|
||||
update_log_primary_level(_, _) -> set_log_primary_level(debug).
|
||||
|
||||
set_log_primary_level(NewLevel) ->
|
||||
case NewLevel =/= emqx_logger:get_primary_log_level() of
|
||||
true -> emqx_logger:set_primary_log_level(NewLevel);
|
||||
false -> ok
|
||||
end.
|
|
@ -0,0 +1,147 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_trace_api).
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([ list_trace/2
|
||||
, create_trace/2
|
||||
, update_trace/2
|
||||
, delete_trace/2
|
||||
, clear_traces/2
|
||||
, download_zip_log/2
|
||||
, stream_log_file/2
|
||||
]).
|
||||
-export([read_trace_file/3]).
|
||||
|
||||
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
||||
-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])}).
|
||||
|
||||
list_trace(_, Params) ->
|
||||
List =
|
||||
case Params of
|
||||
[{<<"enable">>, Enable}] -> emqx_trace:list(Enable);
|
||||
_ -> emqx_trace:list()
|
||||
end,
|
||||
{ok, emqx_trace:format(List)}.
|
||||
|
||||
create_trace(_, Param) ->
|
||||
case emqx_trace:create(Param) of
|
||||
ok -> ok;
|
||||
{error, {already_existed, Name}} ->
|
||||
{error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])};
|
||||
{error, {duplicate_condition, Name}} ->
|
||||
{error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])};
|
||||
{error, Reason} ->
|
||||
{error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}
|
||||
end.
|
||||
|
||||
delete_trace(#{name := Name}, _Param) ->
|
||||
case emqx_trace:delete(Name) of
|
||||
ok -> ok;
|
||||
{error, not_found} -> ?NOT_FOUND(Name)
|
||||
end.
|
||||
|
||||
clear_traces(_, _) ->
|
||||
emqx_trace:clear().
|
||||
|
||||
update_trace(#{name := Name, operation := Operation}, _Param) ->
|
||||
Enable = case Operation of disable -> false; enable -> true end,
|
||||
case emqx_trace:update(Name, Enable) of
|
||||
ok -> {ok, #{enable => Enable, name => Name}};
|
||||
{error, not_found} -> ?NOT_FOUND(Name)
|
||||
end.
|
||||
|
||||
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
||||
%% cowboy_compress_h will auto encode gzip format.
|
||||
download_zip_log(#{name := Name}, _Param) ->
|
||||
case emqx_trace:get_trace_filename(Name) of
|
||||
{ok, TraceLog} ->
|
||||
TraceFiles = collect_trace_file(TraceLog),
|
||||
ZipDir = emqx_trace:zip_dir(),
|
||||
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
||||
ZipFileName = ZipDir ++ TraceLog,
|
||||
{ok, ZipFile} = zip:zip(ZipFileName, Zips),
|
||||
emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
||||
{ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
||||
lists:foldl(fun(Res, Acc) ->
|
||||
case Res of
|
||||
{ok, Node, Bin} ->
|
||||
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
||||
ok = file:write_file(ZipName, Bin),
|
||||
[ZipName | Acc];
|
||||
{error, Node, Reason} ->
|
||||
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
||||
Acc
|
||||
end
|
||||
end, [], TraceFiles).
|
||||
|
||||
collect_trace_file(TraceLog) ->
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
{Files, BadNodes} = rpc:multicall(Nodes, emqx_trace, trace_file, [TraceLog], 60000),
|
||||
BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]),
|
||||
Files.
|
||||
|
||||
%% _page as position and _limit as bytes for front-end reusing components
|
||||
stream_log_file(#{name := Name}, Params) ->
|
||||
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
|
||||
Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>),
|
||||
Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>),
|
||||
Node = binary_to_existing_atom(Node0),
|
||||
Position = binary_to_integer(Position0),
|
||||
Bytes = binary_to_integer(Bytes0),
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
{ok, Bin} ->
|
||||
Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes},
|
||||
{ok, #{meta => Meta, items => Bin}};
|
||||
eof ->
|
||||
Meta = #{<<"page">> => Position, <<"limit">> => Bytes},
|
||||
{ok, #{meta => Meta, items => <<"">>}};
|
||||
{error, Reason} ->
|
||||
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% this is an rpc call for stream_log_file/2
|
||||
read_trace_file(Name, Position, Limit) ->
|
||||
TraceDir = emqx_trace:trace_dir(),
|
||||
{ok, AllFiles} = file:list_dir(TraceDir),
|
||||
TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_",
|
||||
Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end,
|
||||
case lists:filter(Filter, AllFiles) of
|
||||
[TraceFile] ->
|
||||
TracePath = filename:join([TraceDir, TraceFile]),
|
||||
read_file(TracePath, Position, Limit);
|
||||
[] -> {error, not_found}
|
||||
end.
|
||||
|
||||
read_file(Path, Offset, Bytes) ->
|
||||
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
|
||||
try
|
||||
_ = case Offset of
|
||||
0 -> ok;
|
||||
_ -> file:position(IoDevice, {bof, Offset})
|
||||
end,
|
||||
file:read(IoDevice, Bytes)
|
||||
after
|
||||
file:close(IoDevice)
|
||||
end.
|
|
@ -0,0 +1,353 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_trace_SUITE).
|
||||
|
||||
%% API
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
-record(emqx_trace, {
|
||||
name,
|
||||
type,
|
||||
topic,
|
||||
clientid,
|
||||
packets = [],
|
||||
enable = true,
|
||||
start_at,
|
||||
end_at,
|
||||
log_size = #{}
|
||||
}).
|
||||
|
||||
-define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL'
|
||||
, 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK'
|
||||
, 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_plugin_libs),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_base_create_delete(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
End = to_rfc3339(Now + 30 * 60),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"start_at">>, Start},
|
||||
{<<"end_at">>, End}
|
||||
],
|
||||
AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}),
|
||||
ok = emqx_trace:create(Trace),
|
||||
?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)),
|
||||
?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)),
|
||||
[TraceRec] = emqx_trace:list(),
|
||||
Expect = #emqx_trace{
|
||||
name = Name,
|
||||
type = clientid,
|
||||
topic = undefined,
|
||||
clientid = ClientId,
|
||||
packets = ?PACKETS,
|
||||
start_at = Now,
|
||||
end_at = Now + 30 * 60
|
||||
},
|
||||
?assertEqual(Expect, TraceRec),
|
||||
ExpectFormat = [
|
||||
#{
|
||||
clientid => <<"test-device">>,
|
||||
enable => true,
|
||||
type => clientid,
|
||||
packets => ?PACKETS,
|
||||
name => <<"name1">>,
|
||||
start_at => Start,
|
||||
end_at => End,
|
||||
log_size => #{},
|
||||
topic => undefined
|
||||
}
|
||||
],
|
||||
?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])),
|
||||
?assertEqual(ok, emqx_trace:delete(Name)),
|
||||
?assertEqual({error, not_found}, emqx_trace:delete(Name)),
|
||||
?assertEqual([], emqx_trace:list()),
|
||||
ok.
|
||||
|
||||
t_create_size_max(_Config) ->
|
||||
emqx_trace:clear(),
|
||||
lists:map(fun(Seq) ->
|
||||
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||
Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||
ok = emqx_trace:create(Trace)
|
||||
end, lists:seq(1, 30)),
|
||||
Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}],
|
||||
{error, _} = emqx_trace:create(Trace31),
|
||||
ok = emqx_trace:delete(<<"name30">>),
|
||||
ok = emqx_trace:create(Trace31),
|
||||
?assertEqual(30, erlang:length(emqx_trace:list())),
|
||||
ok.
|
||||
|
||||
t_create_failed(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
UnknownField = [{<<"unknown">>, 12}],
|
||||
{error, Reason1} = emqx_trace:create(UnknownField),
|
||||
?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)),
|
||||
|
||||
InvalidTopic = [{<<"topic">>, "#/#//"}],
|
||||
{error, Reason2} = emqx_trace:create(InvalidTopic),
|
||||
?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||
|
||||
InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
|
||||
{error, Reason3} = emqx_trace:create(InvalidStart),
|
||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||
iolist_to_binary(Reason3)),
|
||||
|
||||
InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}],
|
||||
{error, Reason4} = emqx_trace:create(InvalidEnd),
|
||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||
iolist_to_binary(Reason4)),
|
||||
|
||||
InvalidPackets = [{<<"packets">>, [<<"publish">>]}],
|
||||
{error, Reason5} = emqx_trace:create(InvalidPackets),
|
||||
?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)),
|
||||
|
||||
InvalidPackets2 = [{<<"packets">>, <<"publish">>}],
|
||||
{error, Reason6} = emqx_trace:create(InvalidPackets2),
|
||||
?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)),
|
||||
|
||||
{error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
|
||||
?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)),
|
||||
|
||||
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
|
||||
{<<"type">>, <<"clientid">>}],
|
||||
{error, Reason9} = emqx_trace:create(InvalidPackets4),
|
||||
?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
|
||||
|
||||
?assertEqual({error, "type required"}, emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}])),
|
||||
|
||||
?assertEqual({error, "incorrect type: only support clientid/topic"},
|
||||
emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
||||
ok.
|
||||
|
||||
t_create_default(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
{error, "name required"} = emqx_trace:create([]),
|
||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]),
|
||||
[#emqx_trace{packets = Packets}] = emqx_trace:list(),
|
||||
?assertEqual([], Packets),
|
||||
ok = emqx_trace:clear(),
|
||||
Trace = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
||||
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
|
||||
],
|
||||
{error, "end_at time has already passed"} = emqx_trace:create(Trace),
|
||||
Now = erlang:system_time(second),
|
||||
Trace2 = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, to_rfc3339(Now + 10)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 3)}
|
||||
],
|
||||
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
|
||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||
[#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
|
||||
?assertEqual(10 * 60, End - Start),
|
||||
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
||||
ok.
|
||||
|
||||
t_update_enable(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
Name = <<"test-name">>,
|
||||
Now = erlang:system_time(second),
|
||||
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||
ok = emqx_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||
[#emqx_trace{enable = Enable}] = emqx_trace:list(),
|
||||
?assertEqual(Enable, true),
|
||||
ok = emqx_trace:update(Name, false),
|
||||
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||
ok = emqx_trace:update(Name, false),
|
||||
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||
ok = emqx_trace:update(Name, true),
|
||||
[#emqx_trace{enable = true}] = emqx_trace:list(),
|
||||
ok = emqx_trace:update(Name, false),
|
||||
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||
?assertEqual({error, not_found}, emqx_trace:update(<<"Name not found">>, true)),
|
||||
ct:sleep(2100),
|
||||
?assertEqual({error, finished}, emqx_trace:update(Name, true)),
|
||||
ok.
|
||||
|
||||
t_load_state(_Config) ->
|
||||
emqx_trace:clear(),
|
||||
load(),
|
||||
Now = erlang:system_time(second),
|
||||
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 2)}],
|
||||
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 8)}],
|
||||
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
|
||||
{<<"end_at">>, to_rfc3339(Now)}],
|
||||
ok = emqx_trace:create(Running),
|
||||
ok = emqx_trace:create(Waiting),
|
||||
{error, "end_at time has already passed"} = emqx_trace:create(Finished),
|
||||
Traces = emqx_trace:format(emqx_trace:list()),
|
||||
?assertEqual(2, erlang:length(Traces)),
|
||||
Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces),
|
||||
ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}],
|
||||
?assertEqual(ExpectEnables, lists:sort(Enables)),
|
||||
ct:sleep(3500),
|
||||
Traces2 = emqx_trace:format(emqx_trace:list()),
|
||||
?assertEqual(2, erlang:length(Traces2)),
|
||||
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
||||
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
||||
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
t_client_event(_Config) ->
|
||||
application:set_env(emqx, allow_anonymous, true),
|
||||
emqx_trace:clear(),
|
||||
ClientId = <<"client-test">>,
|
||||
load(),
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
Name = <<"test_client_id_event">>,
|
||||
ok = emqx_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
emqtt:ping(Client),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
||||
ct:sleep(200),
|
||||
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
||||
ok = emqtt:disconnect(Client),
|
||||
ct:sleep(200),
|
||||
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
||||
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
||||
?assert(erlang:byte_size(Bin) > 0),
|
||||
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
||||
?assert(erlang:byte_size(Bin3) > 0),
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
t_get_log_filename(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
load(),
|
||||
Now = erlang:system_time(second),
|
||||
Start = calendar:system_time_to_rfc3339(Now),
|
||||
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"start_at">>, list_to_binary(Start)},
|
||||
{<<"end_at">>, list_to_binary(End)}
|
||||
],
|
||||
ok = emqx_trace:create(Trace),
|
||||
?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)),
|
||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||
ct:sleep(3000),
|
||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
t_trace_file(_Config) ->
|
||||
FileName = "test.log",
|
||||
Content = <<"test \n test">>,
|
||||
TraceDir = emqx_trace:trace_dir(),
|
||||
File = filename:join(TraceDir, FileName),
|
||||
ok = file:write_file(File, Content),
|
||||
{ok, Node, Bin} = emqx_trace:trace_file(FileName),
|
||||
?assertEqual(Node, atom_to_list(node())),
|
||||
?assertEqual(Content, Bin),
|
||||
ok = file:delete(File),
|
||||
ok.
|
||||
|
||||
t_download_log(_Config) ->
|
||||
emqx_trace:clear(),
|
||||
load(),
|
||||
ClientId = <<"client-test">>,
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
Name = <<"test_client_id">>,
|
||||
ok = emqx_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||
ct:sleep(100),
|
||||
{ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} =
|
||||
emqx_trace_api:download_zip_log(#{name => Name}, []),
|
||||
?assert(ZipFileSize > 0),
|
||||
ok = emqtt:disconnect(Client),
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
to_rfc3339(Second) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||
|
||||
load() ->
|
||||
emqx_trace:start_link().
|
||||
|
||||
unload() ->
|
||||
gen_server:stop(emqx_trace).
|
|
@ -41,7 +41,8 @@
|
|||
start_listeners() ->
|
||||
lists:foreach(fun(Listener) -> start_listener(Listener) end, listeners()).
|
||||
|
||||
%% Start HTTP Listener
|
||||
|
||||
%% Start HTTP(S) Listener
|
||||
start_listener({Proto, Port, Options}) ->
|
||||
Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}},
|
||||
{"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}},
|
||||
|
@ -88,7 +89,7 @@ listener_name(Proto) ->
|
|||
http_handlers() ->
|
||||
Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
|
||||
[{"/api/v4/",
|
||||
minirest:handler(#{apps => Plugins ++ [emqx_modules, emqx_plugin_libs],
|
||||
minirest:handler(#{apps => Plugins ++ [emqx_modules, emqx_plugin_libs],
|
||||
filter => fun ?MODULE:filter/1}),
|
||||
[{authorization, fun ?MODULE:is_authorized/1}]}].
|
||||
|
||||
|
|
|
@ -166,4 +166,3 @@ api_path(Path) ->
|
|||
|
||||
json(Data) ->
|
||||
{ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx.
|
||||
|
||||
|
|
|
@ -16,493 +16,24 @@
|
|||
|
||||
-module(emqx_mod_trace).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(emqx_gen_mod).
|
||||
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
-logger_header("[Trace]").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
-export([ load/1
|
||||
, unload/1
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-export([ start_link/0
|
||||
, list/0
|
||||
, list/1
|
||||
, get_trace_filename/1
|
||||
, create/1
|
||||
, delete/1
|
||||
, clear/0
|
||||
, update/2
|
||||
]).
|
||||
|
||||
-export([ format/1
|
||||
, zip_dir/0
|
||||
, trace_dir/0
|
||||
, trace_file/1
|
||||
, delete_files_after_send/2
|
||||
]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(TRACE, ?MODULE).
|
||||
-define(PACKETS, tuple_to_list(?TYPE_NAMES)).
|
||||
-define(MAX_SIZE, 30).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([log_file/2]).
|
||||
-endif.
|
||||
|
||||
-record(?TRACE,
|
||||
{ name :: binary() | undefined | '_'
|
||||
, type :: clientid | topic | undefined | '_'
|
||||
, topic :: emqx_types:topic() | undefined | '_'
|
||||
, clientid :: emqx_types:clientid() | undefined | '_'
|
||||
, packets = [] :: list() | '_'
|
||||
, enable = true :: boolean() | '_'
|
||||
, start_at :: integer() | undefined | binary() | '_'
|
||||
, end_at :: integer() | undefined | binary() | '_'
|
||||
, log_size = #{} :: map() | '_'
|
||||
}).
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = ekka_mnesia:create_table(?TRACE, [
|
||||
{type, set},
|
||||
{disc_copies, [node()]},
|
||||
{record_name, ?TRACE},
|
||||
{attributes, record_info(fields, ?TRACE)}]);
|
||||
mnesia(copy) ->
|
||||
ok = ekka_mnesia:copy_table(?TRACE, disc_copies).
|
||||
|
||||
-spec description() -> string().
|
||||
description() ->
|
||||
"EMQ X Trace Module".
|
||||
|
||||
-spec load(any()) -> ok.
|
||||
load(_Env) ->
|
||||
emqx_mod_sup:start_child(?MODULE, worker).
|
||||
emqx_mod_sup:start_child(emqx_trace, worker).
|
||||
|
||||
-spec unload(any()) -> ok.
|
||||
unload(_Env) ->
|
||||
_ = emqx_mod_sup:stop_child(?MODULE),
|
||||
stop_all_trace_handler().
|
||||
|
||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec list() -> [tuple()].
|
||||
list() ->
|
||||
ets:match_object(?TRACE, #?TRACE{_ = '_'}).
|
||||
|
||||
-spec list(boolean()) -> [tuple()].
|
||||
list(Enable) ->
|
||||
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
|
||||
|
||||
-spec create([{Key :: binary(), Value :: binary()}]) ->
|
||||
ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
|
||||
create(Trace) ->
|
||||
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
||||
true ->
|
||||
case to_trace(Trace) of
|
||||
{ok, TraceRec} -> create_new_trace(TraceRec);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
false ->
|
||||
{error, """The number of traces created has reached the maximum,
|
||||
please delete the useless ones first"""}
|
||||
end.
|
||||
|
||||
-spec delete(Name :: binary()) -> ok | {error, not_found}.
|
||||
delete(Name) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name) of
|
||||
[_] -> mnesia:delete(?TRACE, Name, write);
|
||||
[] -> mnesia:abort(not_found)
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec clear() -> ok | {error, Reason :: term()}.
|
||||
clear() ->
|
||||
case mnesia:clear_table(?TRACE) of
|
||||
{atomic, ok} -> ok;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
-spec update(Name :: binary(), Enable :: boolean()) ->
|
||||
ok | {error, not_found | finished}.
|
||||
update(Name, Enable) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name) of
|
||||
[] -> mnesia:abort(not_found);
|
||||
[#?TRACE{enable = Enable}] -> ok;
|
||||
[Rec] ->
|
||||
case erlang:system_time(second) >= Rec#?TRACE.end_at of
|
||||
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
|
||||
true -> mnesia:abort(finished)
|
||||
end
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec get_trace_filename(Name :: binary()) ->
|
||||
{ok, FileName :: string()} | {error, not_found}.
|
||||
get_trace_filename(Name) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Name, read) of
|
||||
[] -> mnesia:abort(not_found);
|
||||
[#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
|
||||
end end,
|
||||
transaction(Tran).
|
||||
|
||||
-spec trace_file(File :: list()) ->
|
||||
{ok, Node :: list(), Binary :: binary()} |
|
||||
{error, Node :: list(), Reason :: term()}.
|
||||
trace_file(File) ->
|
||||
FileName = filename:join(trace_dir(), File),
|
||||
Node = atom_to_list(node()),
|
||||
case file:read_file(FileName) of
|
||||
{ok, Bin} -> {ok, Node, Bin};
|
||||
{error, Reason} -> {error, Node, Reason}
|
||||
end.
|
||||
|
||||
delete_files_after_send(TraceLog, Zips) ->
|
||||
gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}).
|
||||
|
||||
-spec format(list(#?TRACE{})) -> list(map()).
|
||||
format(Traces) ->
|
||||
Fields = record_info(fields, ?TRACE),
|
||||
lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) ->
|
||||
Trace = Trace0#?TRACE{
|
||||
start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)),
|
||||
end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt))
|
||||
},
|
||||
[_ | Values] = tuple_to_list(Trace),
|
||||
maps:from_list(lists:zip(Fields, Values))
|
||||
end, Traces).
|
||||
|
||||
init([]) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
||||
ok = filelib:ensure_dir(trace_dir()),
|
||||
ok = filelib:ensure_dir(zip_dir()),
|
||||
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
||||
Traces = get_enable_trace(),
|
||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||
TRef = update_trace(Traces),
|
||||
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) ->
|
||||
erlang:monitor(process, Pid),
|
||||
{noreply, State#{monitors => Monitors#{Pid => Files}}};
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
|
||||
case maps:take(Pid, Monitors) of
|
||||
error -> {noreply, State};
|
||||
{Files, NewMonitors} ->
|
||||
lists:foreach(fun(F) -> file:delete(F) end, Files),
|
||||
{noreply, State#{monitors => NewMonitors}}
|
||||
end;
|
||||
handle_info({timeout, TRef, update_trace},
|
||||
#{timer := TRef, primary_log_level := OriginLogLevel} = State) ->
|
||||
Traces = get_enable_trace(),
|
||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||
NextTRef = update_trace(Traces),
|
||||
{noreply, State#{timer => NextTRef}};
|
||||
|
||||
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
|
||||
emqx_misc:cancel_timer(TRef),
|
||||
handle_info({timeout, TRef, update_trace}, State);
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
|
||||
ok = set_log_primary_level(OriginLogLevel),
|
||||
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
|
||||
emqx_misc:cancel_timer(TRef),
|
||||
stop_all_trace_handler(),
|
||||
_ = file:del_dir_r(zip_dir()),
|
||||
ok.
|
||||
|
||||
code_change(_, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
create_new_trace(Trace) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Trace#?TRACE.name) of
|
||||
[] ->
|
||||
#?TRACE{start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets} = Trace,
|
||||
Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets},
|
||||
case mnesia:match_object(?TRACE, Match, read) of
|
||||
[] -> mnesia:write(?TRACE, Trace, write);
|
||||
[#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
|
||||
end;
|
||||
[#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name})
|
||||
end
|
||||
end,
|
||||
transaction(Tran).
|
||||
|
||||
update_trace(Traces) ->
|
||||
Now = erlang:system_time(second),
|
||||
{_Waiting, Running, Finished} = classify_by_time(Traces, Now),
|
||||
disable_finished(Finished),
|
||||
Started = already_running(),
|
||||
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
||||
NeedStop = AllStarted -- NeedRunning,
|
||||
ok = stop_trace(NeedStop, Started),
|
||||
clean_stale_trace_files(),
|
||||
NextTime = find_closest_time(Traces, Now),
|
||||
emqx_misc:start_timer(NextTime, update_trace).
|
||||
|
||||
stop_all_trace_handler() ->
|
||||
lists:foreach(fun(#{type := Type, name := Name} = Trace) ->
|
||||
_ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name)
|
||||
end
|
||||
, already_running()).
|
||||
|
||||
already_running() ->
|
||||
emqx_tracer:lookup_traces().
|
||||
|
||||
get_enable_trace() ->
|
||||
{atomic, Traces} =
|
||||
mnesia:transaction(fun() ->
|
||||
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
||||
end),
|
||||
Traces.
|
||||
|
||||
find_closest_time(Traces, Now) ->
|
||||
Sec =
|
||||
lists:foldl(
|
||||
fun(#?TRACE{start_at = Start, end_at = End}, Closest)
|
||||
when Start >= Now andalso Now < End -> %% running
|
||||
min(End - Now, Closest);
|
||||
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
|
||||
min(Now - Start, Closest);
|
||||
(_, Closest) -> Closest %% finished
|
||||
end, 60 * 15, Traces),
|
||||
timer:seconds(Sec).
|
||||
|
||||
disable_finished([]) -> ok;
|
||||
disable_finished(Traces) ->
|
||||
NameWithLogSize =
|
||||
lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) ->
|
||||
FileSize = filelib:file_size(log_file(Name, StartAt)),
|
||||
{Name, FileSize} end, Traces),
|
||||
transaction(fun() ->
|
||||
lists:map(fun({Name, LogSize}) ->
|
||||
case mnesia:read(?TRACE, Name, write) of
|
||||
[] -> ok;
|
||||
[Trace = #?TRACE{log_size = Logs}] ->
|
||||
mnesia:write(?TRACE, Trace#?TRACE{enable = false,
|
||||
log_size = Logs#{node() => LogSize}}, write)
|
||||
end end, NameWithLogSize)
|
||||
end).
|
||||
|
||||
start_trace(Traces, Started0) ->
|
||||
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
|
||||
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
|
||||
case lists:member(Name, StartedAcc) of
|
||||
true -> {[Name | Running], StartedAcc};
|
||||
false ->
|
||||
case start_trace(Trace) of
|
||||
ok -> {[Name | Running], [Name | StartedAcc]};
|
||||
Error ->
|
||||
?LOG(error, "(~p)start trace failed by:~p", [Name, Error]),
|
||||
{[Name | Running], StartedAcc}
|
||||
end
|
||||
end
|
||||
end, {[], Started}, Traces).
|
||||
|
||||
start_trace(Trace) ->
|
||||
#?TRACE{name = Name
|
||||
, type = Type
|
||||
, clientid = ClientId
|
||||
, topic = Topic
|
||||
, packets = Packets
|
||||
, start_at = Start
|
||||
} = Trace,
|
||||
Who0 = #{name => Name, labels => Packets},
|
||||
Who =
|
||||
case Type of
|
||||
topic -> Who0#{type => topic, topic => Topic};
|
||||
clientid -> Who0#{type => clientid, clientid => ClientId}
|
||||
end,
|
||||
case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of
|
||||
ok -> ok;
|
||||
{error, {already_exist, _}} -> ok;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
stop_trace(Finished, Started) ->
|
||||
lists:foreach(fun(#{name := Name, type := Type} = Trace) ->
|
||||
case lists:member(Name, Finished) of
|
||||
true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name);
|
||||
false -> ok
|
||||
end
|
||||
end, Started).
|
||||
|
||||
clean_stale_trace_files() ->
|
||||
TraceDir = trace_dir(),
|
||||
case file:list_dir(TraceDir) of
|
||||
{ok, AllFiles} when AllFiles =/= ["zip"] ->
|
||||
FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end,
|
||||
KeepFiles = lists:map(FileFun, list()),
|
||||
case AllFiles -- ["zip" | KeepFiles] of
|
||||
[] -> ok;
|
||||
DeleteFiles ->
|
||||
DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end,
|
||||
lists:foreach(DelFun, DeleteFiles)
|
||||
end;
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
classify_by_time(Traces, Now) ->
|
||||
classify_by_time(Traces, Now, [], [], []).
|
||||
|
||||
classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish};
|
||||
classify_by_time([Trace = #?TRACE{start_at = Start} | Traces],
|
||||
Now, Wait, Run, Finish) when Start > Now ->
|
||||
classify_by_time(Traces, Now, [Trace | Wait], Run, Finish);
|
||||
classify_by_time([Trace = #?TRACE{end_at = End} | Traces],
|
||||
Now, Wait, Run, Finish) when End =< Now ->
|
||||
classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]);
|
||||
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
|
||||
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
||||
|
||||
to_trace(TraceList) ->
|
||||
case to_trace(TraceList, #?TRACE{}) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
{ok, #?TRACE{name = undefined}} ->
|
||||
{error, "name required"};
|
||||
{ok, #?TRACE{type = undefined}} ->
|
||||
{error, "type required"};
|
||||
{ok, #?TRACE{topic = undefined, clientid = undefined}} ->
|
||||
{error, "topic/clientid cannot be both empty"};
|
||||
{ok, Trace} ->
|
||||
case fill_default(Trace) of
|
||||
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
||||
{error, "failed by start_at >= end_at"};
|
||||
Trace1 -> {ok, Trace1}
|
||||
end
|
||||
end.
|
||||
|
||||
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
||||
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
||||
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
||||
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
|
||||
fill_default(Trace) -> Trace.
|
||||
|
||||
to_trace([], Rec) -> {ok, Rec};
|
||||
to_trace([{<<"name">>, Name} | Trace], Rec) ->
|
||||
case binary:match(Name, [<<"/">>], []) of
|
||||
nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
|
||||
_ -> {error, "name cannot contain /"}
|
||||
end;
|
||||
to_trace([{<<"type">>, Type} | Trace], Rec) ->
|
||||
case lists:member(Type, [<<"clientid">>, <<"topic">>]) of
|
||||
true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
|
||||
false -> {error, "incorrect type: only support clientid/topic"}
|
||||
end;
|
||||
to_trace([{<<"topic">>, Topic} | Trace], Rec) ->
|
||||
case validate_topic(Topic) of
|
||||
ok -> to_trace(Trace, Rec#?TRACE{topic = Topic});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"start_at">>, StartAt} | Trace], Rec) ->
|
||||
case to_system_second(StartAt) of
|
||||
{ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"end_at">>, EndAt} | Trace], Rec) ->
|
||||
Now = erlang:system_time(second),
|
||||
case to_system_second(EndAt) of
|
||||
{ok, Sec} when Sec > Now ->
|
||||
to_trace(Trace, Rec#?TRACE{end_at = Sec});
|
||||
{ok, _Sec} ->
|
||||
{error, "end_at time has already passed"};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
to_trace([{<<"clientid">>, ClientId} | Trace], Rec) ->
|
||||
to_trace(Trace, Rec#?TRACE{clientid = ClientId});
|
||||
to_trace([{<<"packets">>, PacketList} | Trace], Rec) ->
|
||||
case to_packets(PacketList) of
|
||||
{ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets});
|
||||
{error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])}
|
||||
end;
|
||||
to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
|
||||
|
||||
validate_topic(TopicName) ->
|
||||
try emqx_topic:validate(name, TopicName) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error ->
|
||||
{error, io_lib:format("~s invalid by ~p", [TopicName, Error])}
|
||||
end.
|
||||
|
||||
to_system_second(At) ->
|
||||
try
|
||||
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
|
||||
{ok, Sec}
|
||||
catch error: {badmatch, _} ->
|
||||
{error, ["The rfc3339 specification not satisfied: ", At]}
|
||||
end.
|
||||
|
||||
to_packets(Packets) when is_list(Packets) ->
|
||||
AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets),
|
||||
case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of
|
||||
[] -> {ok, AtomTypes};
|
||||
InvalidE -> {error, InvalidE}
|
||||
end;
|
||||
to_packets(Packets) -> {error, Packets}.
|
||||
|
||||
zip_dir() ->
|
||||
trace_dir() ++ "zip/".
|
||||
|
||||
trace_dir() ->
|
||||
filename:join(emqx:get_env(data_dir), "trace") ++ "/".
|
||||
|
||||
log_file(Name, Start) ->
|
||||
filename:join(trace_dir(), filename(Name, Start)).
|
||||
|
||||
filename(Name, Start) ->
|
||||
[Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
|
||||
lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
|
||||
|
||||
transaction(Tran) ->
|
||||
case mnesia:transaction(Tran) of
|
||||
{atomic, Res} -> Res;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel);
|
||||
update_log_primary_level(_, _) -> set_log_primary_level(debug).
|
||||
|
||||
set_log_primary_level(NewLevel) ->
|
||||
case NewLevel =/= emqx_logger:get_primary_log_level() of
|
||||
true -> emqx_logger:set_primary_log_level(NewLevel);
|
||||
false -> ok
|
||||
end.
|
||||
emqx_mod_sup:stop_child(emqx_trace).
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_mod_trace_api).
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([ list_trace/2
|
||||
|
@ -26,10 +25,6 @@
|
|||
, download_zip_log/2
|
||||
, stream_log_file/2
|
||||
]).
|
||||
-export([read_trace_file/3]).
|
||||
|
||||
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
||||
-define(RETURN_NOT_FOUND(N), return({error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])})).
|
||||
|
||||
-import(minirest, [return/1]).
|
||||
|
||||
|
@ -75,117 +70,26 @@
|
|||
func => stream_log_file,
|
||||
descr => "download trace's log"}).
|
||||
|
||||
list_trace(_, Params) ->
|
||||
List =
|
||||
case Params of
|
||||
[{<<"enable">>, Enable}] -> emqx_mod_trace:list(Enable);
|
||||
_ -> emqx_mod_trace:list()
|
||||
end,
|
||||
return({ok, emqx_mod_trace:format(List)}).
|
||||
list_trace(Path, Params) ->
|
||||
return(emqx_trace_api:list_trace(Path, Params)).
|
||||
|
||||
create_trace(_, Param) ->
|
||||
case emqx_mod_trace:create(Param) of
|
||||
ok -> return(ok);
|
||||
{error, {already_existed, Name}} ->
|
||||
return({error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])});
|
||||
{error, {duplicate_condition, Name}} ->
|
||||
return({error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])});
|
||||
{error, Reason} ->
|
||||
return({error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)})
|
||||
create_trace(Path, Params) ->
|
||||
return(emqx_trace_api:create_trace(Path, Params)).
|
||||
|
||||
delete_trace(Path, Params) ->
|
||||
return(emqx_trace_api:delete_trace(Path, Params)).
|
||||
|
||||
clear_traces(Path, Params) ->
|
||||
return(emqx_trace_api:clear_traces(Path, Params)).
|
||||
|
||||
update_trace(Path, Params) ->
|
||||
return(emqx_trace_api:update_trace(Path, Params)).
|
||||
|
||||
download_zip_log(Path, Params) ->
|
||||
case emqx_trace_api:download_zip_log(Path, Params) of
|
||||
{ok, _Header, _File}= Return -> Return;
|
||||
{error, _Reason} = Err -> return(Err)
|
||||
end.
|
||||
|
||||
delete_trace(#{name := Name}, _Param) ->
|
||||
case emqx_mod_trace:delete(Name) of
|
||||
ok -> return(ok);
|
||||
{error, not_found} -> ?RETURN_NOT_FOUND(Name)
|
||||
end.
|
||||
|
||||
clear_traces(_, _) ->
|
||||
return(emqx_mod_trace:clear()).
|
||||
|
||||
update_trace(#{name := Name, operation := Operation}, _Param) ->
|
||||
Enable = case Operation of disable -> false; enable -> true end,
|
||||
case emqx_mod_trace:update(Name, Enable) of
|
||||
ok -> return({ok, #{enable => Enable, name => Name}});
|
||||
{error, not_found} -> ?RETURN_NOT_FOUND(Name)
|
||||
end.
|
||||
|
||||
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
||||
%% cowboy_compress_h will auto encode gzip format.
|
||||
download_zip_log(#{name := Name}, _Param) ->
|
||||
case emqx_mod_trace:get_trace_filename(Name) of
|
||||
{ok, TraceLog} ->
|
||||
TraceFiles = collect_trace_file(TraceLog),
|
||||
ZipDir = emqx_mod_trace:zip_dir(),
|
||||
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
||||
ZipFileName = ZipDir ++ TraceLog,
|
||||
{ok, ZipFile} = zip:zip(ZipFileName, Zips),
|
||||
emqx_mod_trace:delete_files_after_send(ZipFileName, Zips),
|
||||
{ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}};
|
||||
{error, Reason} ->
|
||||
return({error, Reason})
|
||||
end.
|
||||
|
||||
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
||||
lists:foldl(fun(Res, Acc) ->
|
||||
case Res of
|
||||
{ok, Node, Bin} ->
|
||||
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
||||
ok = file:write_file(ZipName, Bin),
|
||||
[ZipName | Acc];
|
||||
{error, Node, Reason} ->
|
||||
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
||||
Acc
|
||||
end
|
||||
end, [], TraceFiles).
|
||||
|
||||
collect_trace_file(TraceLog) ->
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
{Files, BadNodes} = rpc:multicall(Nodes, emqx_mod_trace, trace_file, [TraceLog], 60000),
|
||||
BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]),
|
||||
Files.
|
||||
|
||||
%% _page as position and _limit as bytes for front-end reusing components
|
||||
stream_log_file(#{name := Name}, Params) ->
|
||||
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
|
||||
Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>),
|
||||
Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>),
|
||||
Node = binary_to_existing_atom(Node0),
|
||||
Position = binary_to_integer(Position0),
|
||||
Bytes = binary_to_integer(Bytes0),
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
{ok, Bin} ->
|
||||
Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes},
|
||||
return({ok, #{meta => Meta, items => Bin}});
|
||||
eof ->
|
||||
Meta = #{<<"page">> => Position, <<"limit">> => Bytes},
|
||||
return({ok, #{meta => Meta, items => <<"">>}});
|
||||
{error, Reason} ->
|
||||
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
|
||||
return({error, Reason})
|
||||
end.
|
||||
|
||||
%% this is an rpc call for stream_log_file/2
|
||||
read_trace_file(Name, Position, Limit) ->
|
||||
TraceDir = emqx_mod_trace:trace_dir(),
|
||||
{ok, AllFiles} = file:list_dir(TraceDir),
|
||||
TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_",
|
||||
Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end,
|
||||
case lists:filter(Filter, AllFiles) of
|
||||
[TraceFile] ->
|
||||
TracePath = filename:join([TraceDir, TraceFile]),
|
||||
read_file(TracePath, Position, Limit);
|
||||
[] -> {error, not_found}
|
||||
end.
|
||||
|
||||
read_file(Path, Offset, Bytes) ->
|
||||
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
|
||||
try
|
||||
_ = case Offset of
|
||||
0 -> ok;
|
||||
_ -> file:position(IoDevice, {bof, Offset})
|
||||
end,
|
||||
file:read(IoDevice, Bytes)
|
||||
after
|
||||
file:close(IoDevice)
|
||||
end.
|
||||
stream_log_file(Path, Params) ->
|
||||
return(emqx_trace_api:stream_log_file(Path, Params)).
|
||||
|
|
|
@ -1,478 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_mod_trace_SUITE).
|
||||
|
||||
%% API
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
-define(HOST, "http://127.0.0.1:18083/").
|
||||
-define(API_VERSION, "v4").
|
||||
-define(BASE_PATH, "api").
|
||||
|
||||
-record(emqx_mod_trace, {
|
||||
name,
|
||||
type,
|
||||
topic,
|
||||
clientid,
|
||||
packets = [],
|
||||
enable = true,
|
||||
start_at,
|
||||
end_at,
|
||||
log_size = #{}
|
||||
}).
|
||||
|
||||
-define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL'
|
||||
, 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK'
|
||||
, 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([emqx_modules, emqx_dashboard]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_modules, emqx_dashboard]).
|
||||
|
||||
t_base_create_delete(_Config) ->
|
||||
ok = emqx_mod_trace:clear(),
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
End = to_rfc3339(Now + 30 * 60),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"start_at">>, Start},
|
||||
{<<"end_at">>, End}
|
||||
],
|
||||
AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}),
|
||||
ok = emqx_mod_trace:create(Trace),
|
||||
?assertEqual({error, {already_existed, Name}}, emqx_mod_trace:create(Trace)),
|
||||
?assertEqual({error, {duplicate_condition, Name}}, emqx_mod_trace:create(AnotherTrace)),
|
||||
[TraceRec] = emqx_mod_trace:list(),
|
||||
Expect = #emqx_mod_trace{
|
||||
name = Name,
|
||||
type = clientid,
|
||||
topic = undefined,
|
||||
clientid = ClientId,
|
||||
packets = ?PACKETS,
|
||||
start_at = Now,
|
||||
end_at = Now + 30 * 60
|
||||
},
|
||||
?assertEqual(Expect, TraceRec),
|
||||
ExpectFormat = [
|
||||
#{
|
||||
clientid => <<"test-device">>,
|
||||
enable => true,
|
||||
type => clientid,
|
||||
packets => ?PACKETS,
|
||||
name => <<"name1">>,
|
||||
start_at => Start,
|
||||
end_at => End,
|
||||
log_size => #{},
|
||||
topic => undefined
|
||||
}
|
||||
],
|
||||
?assertEqual(ExpectFormat, emqx_mod_trace:format([TraceRec])),
|
||||
?assertEqual(ok, emqx_mod_trace:delete(Name)),
|
||||
?assertEqual({error, not_found}, emqx_mod_trace:delete(Name)),
|
||||
?assertEqual([], emqx_mod_trace:list()),
|
||||
ok.
|
||||
|
||||
t_create_size_max(_Config) ->
|
||||
emqx_mod_trace:clear(),
|
||||
lists:map(fun(Seq) ->
|
||||
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||
Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||
ok = emqx_mod_trace:create(Trace)
|
||||
end, lists:seq(1, 30)),
|
||||
Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}],
|
||||
{error, _} = emqx_mod_trace:create(Trace31),
|
||||
ok = emqx_mod_trace:delete(<<"name30">>),
|
||||
ok = emqx_mod_trace:create(Trace31),
|
||||
?assertEqual(30, erlang:length(emqx_mod_trace:list())),
|
||||
ok.
|
||||
|
||||
t_create_failed(_Config) ->
|
||||
ok = emqx_mod_trace:clear(),
|
||||
UnknownField = [{<<"unknown">>, 12}],
|
||||
{error, Reason1} = emqx_mod_trace:create(UnknownField),
|
||||
?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)),
|
||||
|
||||
InvalidTopic = [{<<"topic">>, "#/#//"}],
|
||||
{error, Reason2} = emqx_mod_trace:create(InvalidTopic),
|
||||
?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||
|
||||
InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
|
||||
{error, Reason3} = emqx_mod_trace:create(InvalidStart),
|
||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||
iolist_to_binary(Reason3)),
|
||||
|
||||
InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}],
|
||||
{error, Reason4} = emqx_mod_trace:create(InvalidEnd),
|
||||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||
iolist_to_binary(Reason4)),
|
||||
|
||||
InvalidPackets = [{<<"packets">>, [<<"publish">>]}],
|
||||
{error, Reason5} = emqx_mod_trace:create(InvalidPackets),
|
||||
?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)),
|
||||
|
||||
InvalidPackets2 = [{<<"packets">>, <<"publish">>}],
|
||||
{error, Reason6} = emqx_mod_trace:create(InvalidPackets2),
|
||||
?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)),
|
||||
|
||||
{error, Reason7} = emqx_mod_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
|
||||
?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)),
|
||||
|
||||
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}],
|
||||
{error, Reason9} = emqx_mod_trace:create(InvalidPackets4),
|
||||
?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
|
||||
|
||||
?assertEqual({error, "type required"}, emqx_mod_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}])),
|
||||
|
||||
?assertEqual({error, "incorrect type: only support clientid/topic"},
|
||||
emqx_mod_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
||||
ok.
|
||||
|
||||
t_create_default(_Config) ->
|
||||
ok = emqx_mod_trace:clear(),
|
||||
{error, "name required"} = emqx_mod_trace:create([]),
|
||||
ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]),
|
||||
[#emqx_mod_trace{packets = Packets}] = emqx_mod_trace:list(),
|
||||
?assertEqual([], Packets),
|
||||
ok = emqx_mod_trace:clear(),
|
||||
Trace = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
||||
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
|
||||
],
|
||||
{error, "end_at time has already passed"} = emqx_mod_trace:create(Trace),
|
||||
Now = erlang:system_time(second),
|
||||
Trace2 = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, to_rfc3339(Now + 10)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 3)}
|
||||
],
|
||||
{error, "failed by start_at >= end_at"} = emqx_mod_trace:create(Trace2),
|
||||
ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||
[#emqx_mod_trace{start_at = Start, end_at = End}] = emqx_mod_trace:list(),
|
||||
?assertEqual(10 * 60, End - Start),
|
||||
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
||||
ok.
|
||||
|
||||
t_update_enable(_Config) ->
|
||||
ok = emqx_mod_trace:clear(),
|
||||
Name = <<"test-name">>,
|
||||
Now = erlang:system_time(second),
|
||||
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||
ok = emqx_mod_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||
[#emqx_mod_trace{enable = Enable}] = emqx_mod_trace:list(),
|
||||
?assertEqual(Enable, true),
|
||||
ok = emqx_mod_trace:update(Name, false),
|
||||
[#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(),
|
||||
ok = emqx_mod_trace:update(Name, false),
|
||||
[#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(),
|
||||
ok = emqx_mod_trace:update(Name, true),
|
||||
[#emqx_mod_trace{enable = true}] = emqx_mod_trace:list(),
|
||||
ok = emqx_mod_trace:update(Name, false),
|
||||
[#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(),
|
||||
?assertEqual({error, not_found}, emqx_mod_trace:update(<<"Name not found">>, true)),
|
||||
ct:sleep(2100),
|
||||
?assertEqual({error, finished}, emqx_mod_trace:update(Name, true)),
|
||||
ok.
|
||||
|
||||
t_load_state(_Config) ->
|
||||
emqx_mod_trace:clear(),
|
||||
ok = emqx_mod_trace:load(test),
|
||||
Now = erlang:system_time(second),
|
||||
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, {<<"end_at">>, to_rfc3339(Now + 2)}],
|
||||
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}],
|
||||
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}],
|
||||
ok = emqx_mod_trace:create(Running),
|
||||
ok = emqx_mod_trace:create(Waiting),
|
||||
{error, "end_at time has already passed"} = emqx_mod_trace:create(Finished),
|
||||
Traces = emqx_mod_trace:format(emqx_mod_trace:list()),
|
||||
?assertEqual(2, erlang:length(Traces)),
|
||||
Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces),
|
||||
ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}],
|
||||
?assertEqual(ExpectEnables, lists:sort(Enables)),
|
||||
ct:sleep(3500),
|
||||
Traces2 = emqx_mod_trace:format(emqx_mod_trace:list()),
|
||||
?assertEqual(2, erlang:length(Traces2)),
|
||||
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
||||
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
||||
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
||||
ok = emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
t_client_event(_Config) ->
|
||||
application:set_env(emqx, allow_anonymous, true),
|
||||
emqx_mod_trace:clear(),
|
||||
ClientId = <<"client-test">>,
|
||||
ok = emqx_mod_trace:load(test),
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
Name = <<"test_client_id_event">>,
|
||||
ok = emqx_mod_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
emqtt:ping(Client),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
||||
ct:sleep(200),
|
||||
ok = emqx_mod_trace:create([{<<"name">>, <<"test_topic">>},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Bin} = file:read_file(emqx_mod_trace:log_file(Name, Now)),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
||||
ok = emqtt:disconnect(Client),
|
||||
ct:sleep(200),
|
||||
{ok, Bin2} = file:read_file(emqx_mod_trace:log_file(Name, Now)),
|
||||
{ok, Bin3} = file:read_file(emqx_mod_trace:log_file(<<"test_topic">>, Now)),
|
||||
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
||||
?assert(erlang:byte_size(Bin) > 0),
|
||||
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
||||
?assert(erlang:byte_size(Bin3) > 0),
|
||||
ok = emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
t_get_log_filename(_Config) ->
|
||||
ok = emqx_mod_trace:clear(),
|
||||
ok = emqx_mod_trace:load(test),
|
||||
Now = erlang:system_time(second),
|
||||
Start = calendar:system_time_to_rfc3339(Now),
|
||||
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"start_at">>, list_to_binary(Start)},
|
||||
{<<"end_at">>, list_to_binary(End)}
|
||||
],
|
||||
ok = emqx_mod_trace:create(Trace),
|
||||
?assertEqual({error, not_found}, emqx_mod_trace:get_trace_filename(<<"test">>)),
|
||||
?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))),
|
||||
ct:sleep(3000),
|
||||
?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))),
|
||||
ok = emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
t_trace_file(_Config) ->
|
||||
FileName = "test.log",
|
||||
Content = <<"test \n test">>,
|
||||
TraceDir = emqx_mod_trace:trace_dir(),
|
||||
File = filename:join(TraceDir, FileName),
|
||||
ok = file:write_file(File, Content),
|
||||
{ok, Node, Bin} = emqx_mod_trace:trace_file(FileName),
|
||||
?assertEqual(Node, atom_to_list(node())),
|
||||
?assertEqual(Content, Bin),
|
||||
ok = file:delete(File),
|
||||
ok.
|
||||
|
||||
t_http_test(_Config) ->
|
||||
emqx_mod_trace:clear(),
|
||||
emqx_mod_trace:load(test),
|
||||
Header = auth_header_(),
|
||||
%% list
|
||||
{ok, Empty} = request_api(get, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(Empty)),
|
||||
%% create
|
||||
ErrorTrace = #{},
|
||||
{ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace),
|
||||
?assertEqual(#{<<"message">> => <<"unknown field: {}">>, <<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)),
|
||||
|
||||
Name = <<"test-name">>,
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>}
|
||||
],
|
||||
|
||||
{ok, Create} = request_api(post, api_path("trace"), Header, Trace),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Create)),
|
||||
|
||||
{ok, List} = request_api(get, api_path("trace"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := [Data]} = json(List),
|
||||
?assertEqual(Name, maps:get(<<"name">>, Data)),
|
||||
|
||||
%% update
|
||||
{ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}),
|
||||
?assertEqual(#{<<"code">> => 0,
|
||||
<<"data">> => #{<<"enable">> => false,
|
||||
<<"name">> => <<"test-name">>}}, json(Update)),
|
||||
|
||||
{ok, List1} = request_api(get, api_path("trace"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := [Data1]} = json(List1),
|
||||
?assertEqual(false, maps:get(<<"enable">>, Data1)),
|
||||
|
||||
%% delete
|
||||
{ok, Delete} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Delete)),
|
||||
|
||||
{ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
?assertEqual(#{<<"code">> => <<"NOT_FOUND">>,
|
||||
<<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)),
|
||||
|
||||
{ok, List2} = request_api(get, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)),
|
||||
|
||||
%% clear
|
||||
{ok, Create1} = request_api(post, api_path("trace"), Header, Trace),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Create1)),
|
||||
|
||||
{ok, Clear} = request_api(delete, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Clear)),
|
||||
|
||||
emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
t_download_log(_Config) ->
|
||||
emqx_mod_trace:clear(),
|
||||
emqx_mod_trace:load(test),
|
||||
ClientId = <<"client-test">>,
|
||||
Now = erlang:system_time(second),
|
||||
Start = to_rfc3339(Now),
|
||||
Name = <<"test_client_id">>,
|
||||
ok = emqx_mod_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||
ct:sleep(100),
|
||||
{ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} =
|
||||
emqx_mod_trace_api:download_zip_log(#{name => Name}, []),
|
||||
?assert(ZipFileSize > 0),
|
||||
%% download zip file failed by server_closed occasionally?
|
||||
%Header = auth_header_(),
|
||||
%{ok, ZipBin} = request_api(get, api_path("trace/test_client_id/download"), Header),
|
||||
%{ok, ZipHandler} = zip:zip_open(ZipBin),
|
||||
%{ok, [ZipName]} = zip:zip_get(ZipHandler),
|
||||
%?assertNotEqual(nomatch, string:find(ZipName, "test@127.0.0.1")),
|
||||
%{ok, _} = file:read_file(emqx_mod_trace:log_file(<<"test_client_id">>, Now)),
|
||||
ok = emqtt:disconnect(Client),
|
||||
emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
t_stream_log(_Config) ->
|
||||
application:set_env(emqx, allow_anonymous, true),
|
||||
emqx_mod_trace:clear(),
|
||||
emqx_mod_trace:load(test),
|
||||
ClientId = <<"client-stream">>,
|
||||
Now = erlang:system_time(second),
|
||||
Name = <<"test_stream_log">>,
|
||||
Start = to_rfc3339(Now - 10),
|
||||
ok = emqx_mod_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]),
|
||||
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]),
|
||||
ok = emqtt:disconnect(Client),
|
||||
ct:sleep(200),
|
||||
File = emqx_mod_trace:log_file(Name, Now),
|
||||
ct:pal("FileName: ~p", [File]),
|
||||
{ok, FileBin} = file:read_file(File),
|
||||
ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
|
||||
Header = auth_header_(),
|
||||
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary),
|
||||
?assertEqual(10, byte_size(Bin)),
|
||||
?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta),
|
||||
{ok, Binary1} = request_api(get, api_path("trace/test_stream_log/log?_page=20&_limit=10"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1),
|
||||
?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1),
|
||||
?assertEqual(10, byte_size(Bin1)),
|
||||
emqx_mod_trace:unload(test),
|
||||
ok.
|
||||
|
||||
to_rfc3339(Second) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||
|
||||
auth_header_() ->
|
||||
auth_header_("admin", "public").
|
||||
|
||||
auth_header_(User, Pass) ->
|
||||
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
|
||||
{"Authorization", "Basic " ++ Encoded}.
|
||||
|
||||
request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}).
|
||||
|
||||
request_api(Method, Url, Auth, Body) ->
|
||||
Request = {Url, [Auth], "application/json", emqx_json:encode(Body)},
|
||||
do_request_api(Method, Request).
|
||||
|
||||
do_request_api(Method, Request) ->
|
||||
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||
case httpc:request(Method, Request, [], [{body_format, binary}]) of
|
||||
{error, socket_closed_remotely} ->
|
||||
{error, socket_closed_remotely};
|
||||
{error,{shutdown, server_closed}} ->
|
||||
{error, server_closed};
|
||||
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} }
|
||||
when Code =:= 200 orelse Code =:= 201 ->
|
||||
{ok, Return};
|
||||
{ok, {Reason, _, _}} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
api_path(Path) ->
|
||||
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]).
|
||||
|
||||
json(Data) ->
|
||||
{ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx.
|
|
@ -0,0 +1,179 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_mod_trace_api_SUITE).
|
||||
|
||||
%% API
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
-define(HOST, "http://127.0.0.1:18083/").
|
||||
-define(API_VERSION, "v4").
|
||||
-define(BASE_PATH, "api").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_plugin_libs),
|
||||
emqx_ct_helpers:start_apps([emqx_modules, emqx_dashboard]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_modules, emqx_dashboard]).
|
||||
|
||||
t_http_test(_Config) ->
|
||||
emqx_trace:clear(),
|
||||
load(),
|
||||
Header = auth_header_(),
|
||||
%% list
|
||||
{ok, Empty} = request_api(get, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(Empty)),
|
||||
%% create
|
||||
ErrorTrace = #{},
|
||||
{ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace),
|
||||
?assertEqual(#{<<"message">> => <<"unknown field: {}">>,
|
||||
<<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)),
|
||||
|
||||
Name = <<"test-name">>,
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>}
|
||||
],
|
||||
|
||||
{ok, Create} = request_api(post, api_path("trace"), Header, Trace),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Create)),
|
||||
|
||||
{ok, List} = request_api(get, api_path("trace"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := [Data]} = json(List),
|
||||
?assertEqual(Name, maps:get(<<"name">>, Data)),
|
||||
|
||||
%% update
|
||||
{ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}),
|
||||
?assertEqual(#{<<"code">> => 0,
|
||||
<<"data">> => #{<<"enable">> => false,
|
||||
<<"name">> => <<"test-name">>}}, json(Update)),
|
||||
|
||||
{ok, List1} = request_api(get, api_path("trace"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := [Data1]} = json(List1),
|
||||
?assertEqual(false, maps:get(<<"enable">>, Data1)),
|
||||
|
||||
%% delete
|
||||
{ok, Delete} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Delete)),
|
||||
|
||||
{ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
?assertEqual(#{<<"code">> => <<"NOT_FOUND">>,
|
||||
<<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)),
|
||||
|
||||
{ok, List2} = request_api(get, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)),
|
||||
|
||||
%% clear
|
||||
{ok, Create1} = request_api(post, api_path("trace"), Header, Trace),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Create1)),
|
||||
|
||||
{ok, Clear} = request_api(delete, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0}, json(Clear)),
|
||||
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
t_stream_log(_Config) ->
|
||||
application:set_env(emqx, allow_anonymous, true),
|
||||
emqx_trace:clear(),
|
||||
load(),
|
||||
ClientId = <<"client-stream">>,
|
||||
Now = erlang:system_time(second),
|
||||
Name = <<"test_stream_log">>,
|
||||
Start = to_rfc3339(Now - 10),
|
||||
ok = emqx_trace:create([{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||
ct:sleep(200),
|
||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]),
|
||||
emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]),
|
||||
ok = emqtt:disconnect(Client),
|
||||
ct:sleep(200),
|
||||
File = emqx_trace:log_file(Name, Now),
|
||||
ct:pal("FileName: ~p", [File]),
|
||||
{ok, FileBin} = file:read_file(File),
|
||||
ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
|
||||
Header = auth_header_(),
|
||||
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary),
|
||||
?assertEqual(10, byte_size(Bin)),
|
||||
?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta),
|
||||
Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"),
|
||||
{ok, Binary1} = request_api(get, Path, Header),
|
||||
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1),
|
||||
?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1),
|
||||
?assertEqual(10, byte_size(Bin1)),
|
||||
unload(),
|
||||
ok.
|
||||
|
||||
to_rfc3339(Second) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||
|
||||
auth_header_() ->
|
||||
auth_header_("admin", "public").
|
||||
|
||||
auth_header_(User, Pass) ->
|
||||
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
|
||||
{"Authorization", "Basic " ++ Encoded}.
|
||||
|
||||
request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}).
|
||||
|
||||
request_api(Method, Url, Auth, Body) ->
|
||||
Request = {Url, [Auth], "application/json", emqx_json:encode(Body)},
|
||||
do_request_api(Method, Request).
|
||||
|
||||
do_request_api(Method, Request) ->
|
||||
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||
case httpc:request(Method, Request, [], [{body_format, binary}]) of
|
||||
{error, socket_closed_remotely} ->
|
||||
{error, socket_closed_remotely};
|
||||
{error,{shutdown, server_closed}} ->
|
||||
{error, server_closed};
|
||||
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} }
|
||||
when Code =:= 200 orelse Code =:= 201 ->
|
||||
{ok, Return};
|
||||
{ok, {Reason, _, _}} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
api_path(Path) ->
|
||||
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]).
|
||||
|
||||
json(Data) ->
|
||||
{ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx.
|
||||
|
||||
load() ->
|
||||
emqx_trace:start_link().
|
||||
|
||||
unload() ->
|
||||
gen_server:stop(emqx_trace).
|
Loading…
Reference in New Issue