diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index f72ffc229..7b7dcbc07 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,6 +1,6 @@ {application, emqx_plugin_libs, [{description, "EMQ X Plugin utility libs"}, - {vsn, "4.3.2"}, + {vsn, "4.4.0"}, {modules, []}, {applications, [kernel,stdlib]}, {env, []} diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl new file mode 100644 index 000000000..eb2ed7276 --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl @@ -0,0 +1,487 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace). + +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[Trace]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ start_link/0 + , list/0 + , list/1 + , get_trace_filename/1 + , create/1 + , delete/1 + , clear/0 + , update/2 + ]). + +-export([ format/1 + , zip_dir/0 + , trace_dir/0 + , trace_file/1 + , delete_files_after_send/2 + ]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(TRACE, ?MODULE). +-define(PACKETS, tuple_to_list(?TYPE_NAMES)). +-define(MAX_SIZE, 30). + +-ifdef(TEST). +-export([log_file/2]). +-endif. + +-record(?TRACE, + { name :: binary() | undefined | '_' + , type :: clientid | topic | undefined | '_' + , topic :: emqx_types:topic() | undefined | '_' + , clientid :: emqx_types:clientid() | undefined | '_' + , packets = [] :: list() | '_' + , enable = true :: boolean() | '_' + , start_at :: integer() | undefined | binary() | '_' + , end_at :: integer() | undefined | binary() | '_' + , log_size = #{} :: map() | '_' + }). + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?TRACE, [ + {type, set}, + {disc_copies, [node()]}, + {record_name, ?TRACE}, + {attributes, record_info(fields, ?TRACE)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TRACE, disc_copies). + +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec list() -> [tuple()]. +list() -> + ets:match_object(?TRACE, #?TRACE{_ = '_'}). + +-spec list(boolean()) -> [tuple()]. +list(Enable) -> + ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). + +-spec create([{Key :: binary(), Value :: binary()}]) -> + ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. +create(Trace) -> + case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of + true -> + case to_trace(Trace) of + {ok, TraceRec} -> create_new_trace(TraceRec); + {error, Reason} -> {error, Reason} + end; + false -> + {error, """The number of traces created has reached the maximum, +please delete the useless ones first"""} + end. + +-spec delete(Name :: binary()) -> ok | {error, not_found}. +delete(Name) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name) of + [_] -> mnesia:delete(?TRACE, Name, write); + [] -> mnesia:abort(not_found) + end + end, + transaction(Tran). + +-spec clear() -> ok | {error, Reason :: term()}. +clear() -> + case mnesia:clear_table(?TRACE) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end. + +-spec update(Name :: binary(), Enable :: boolean()) -> + ok | {error, not_found | finished}. +update(Name, Enable) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name) of + [] -> mnesia:abort(not_found); + [#?TRACE{enable = Enable}] -> ok; + [Rec] -> + case erlang:system_time(second) >= Rec#?TRACE.end_at of + false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); + true -> mnesia:abort(finished) + end + end + end, + transaction(Tran). + +-spec get_trace_filename(Name :: binary()) -> + {ok, FileName :: string()} | {error, not_found}. +get_trace_filename(Name) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name, read) of + [] -> mnesia:abort(not_found); + [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)} + end end, + transaction(Tran). + +-spec trace_file(File :: list()) -> + {ok, Node :: list(), Binary :: binary()} | + {error, Node :: list(), Reason :: term()}. +trace_file(File) -> + FileName = filename:join(trace_dir(), File), + Node = atom_to_list(node()), + case file:read_file(FileName) of + {ok, Bin} -> {ok, Node, Bin}; + {error, Reason} -> {error, Node, Reason} + end. + +delete_files_after_send(TraceLog, Zips) -> + gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}). + +-spec format(list(#?TRACE{})) -> list(map()). +format(Traces) -> + Fields = record_info(fields, ?TRACE), + lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) -> + Trace = Trace0#?TRACE{ + start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)), + end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt)) + }, + [_ | Values] = tuple_to_list(Trace), + maps:from_list(lists:zip(Fields, Values)) + end, Traces). + +init([]) -> + erlang:process_flag(trap_exit, true), + OriginLogLevel = emqx_logger:get_primary_log_level(), + ok = filelib:ensure_dir(trace_dir()), + ok = filelib:ensure_dir(zip_dir()), + {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), + Traces = get_enable_trace(), + ok = update_log_primary_level(Traces, OriginLogLevel), + TRef = update_trace(Traces), + {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ok, State}. + +handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) -> + erlang:monitor(process, Pid), + {noreply, State#{monitors => Monitors#{Pid => Files}}}; +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) -> + case maps:take(Pid, Monitors) of + error -> {noreply, State}; + {Files, NewMonitors} -> + lists:foreach(fun(F) -> file:delete(F) end, Files), + {noreply, State#{monitors => NewMonitors}} + end; +handle_info({timeout, TRef, update_trace}, + #{timer := TRef, primary_log_level := OriginLogLevel} = State) -> + Traces = get_enable_trace(), + ok = update_log_primary_level(Traces, OriginLogLevel), + NextTRef = update_trace(Traces), + {noreply, State#{timer => NextTRef}}; + +handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef), + handle_info({timeout, TRef, update_trace}, State); + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> + ok = set_log_primary_level(OriginLogLevel), + _ = mnesia:unsubscribe({table, ?TRACE, simple}), + emqx_misc:cancel_timer(TRef), + stop_all_trace_handler(), + _ = file:del_dir_r(zip_dir()), + ok. + +code_change(_, State, _Extra) -> + {ok, State}. + +create_new_trace(Trace) -> + Tran = fun() -> + case mnesia:read(?TRACE, Trace#?TRACE.name) of + [] -> + #?TRACE{start_at = StartAt, topic = Topic, + clientid = ClientId, packets = Packets} = Trace, + Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic, + clientid = ClientId, packets = Packets}, + case mnesia:match_object(?TRACE, Match, read) of + [] -> mnesia:write(?TRACE, Trace, write); + [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) + end; + [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name}) + end + end, + transaction(Tran). + +update_trace(Traces) -> + Now = erlang:system_time(second), + {_Waiting, Running, Finished} = classify_by_time(Traces, Now), + disable_finished(Finished), + Started = already_running(), + {NeedRunning, AllStarted} = start_trace(Running, Started), + NeedStop = AllStarted -- NeedRunning, + ok = stop_trace(NeedStop, Started), + clean_stale_trace_files(), + NextTime = find_closest_time(Traces, Now), + emqx_misc:start_timer(NextTime, update_trace). + +stop_all_trace_handler() -> + lists:foreach(fun(#{type := Type, name := Name} = Trace) -> + _ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name) + end + , already_running()). + +already_running() -> + emqx_tracer:lookup_traces(). + +get_enable_trace() -> + {atomic, Traces} = + mnesia:transaction(fun() -> + mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) + end), + Traces. + +find_closest_time(Traces, Now) -> + Sec = + lists:foldl( + fun(#?TRACE{start_at = Start, end_at = End}, Closest) + when Start >= Now andalso Now < End -> %% running + min(End - Now, Closest); + (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting + min(Now - Start, Closest); + (_, Closest) -> Closest %% finished + end, 60 * 15, Traces), + timer:seconds(Sec). + +disable_finished([]) -> ok; +disable_finished(Traces) -> + NameWithLogSize = + lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) -> + FileSize = filelib:file_size(log_file(Name, StartAt)), + {Name, FileSize} end, Traces), + transaction(fun() -> + lists:map(fun({Name, LogSize}) -> + case mnesia:read(?TRACE, Name, write) of + [] -> ok; + [Trace = #?TRACE{log_size = Logs}] -> + mnesia:write(?TRACE, Trace#?TRACE{enable = false, + log_size = Logs#{node() => LogSize}}, write) + end end, NameWithLogSize) + end). + +start_trace(Traces, Started0) -> + Started = lists:map(fun(#{name := Name}) -> Name end, Started0), + lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> + case lists:member(Name, StartedAcc) of + true -> {[Name | Running], StartedAcc}; + false -> + case start_trace(Trace) of + ok -> {[Name | Running], [Name | StartedAcc]}; + Error -> + ?LOG(error, "(~p)start trace failed by:~p", [Name, Error]), + {[Name | Running], StartedAcc} + end + end + end, {[], Started}, Traces). + +start_trace(Trace) -> + #?TRACE{name = Name + , type = Type + , clientid = ClientId + , topic = Topic + , packets = Packets + , start_at = Start + } = Trace, + Who0 = #{name => Name, labels => Packets}, + Who = + case Type of + topic -> Who0#{type => topic, topic => Topic}; + clientid -> Who0#{type => clientid, clientid => ClientId} + end, + case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of + ok -> ok; + {error, {already_exist, _}} -> ok; + {error, Reason} -> {error, Reason} + end. + +stop_trace(Finished, Started) -> + lists:foreach(fun(#{name := Name, type := Type} = Trace) -> + case lists:member(Name, Finished) of + true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name); + false -> ok + end + end, Started). + +clean_stale_trace_files() -> + TraceDir = trace_dir(), + case file:list_dir(TraceDir) of + {ok, AllFiles} when AllFiles =/= ["zip"] -> + FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end, + KeepFiles = lists:map(FileFun, list()), + case AllFiles -- ["zip" | KeepFiles] of + [] -> ok; + DeleteFiles -> + DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end, + lists:foreach(DelFun, DeleteFiles) + end; + _ -> ok + end. + +classify_by_time(Traces, Now) -> + classify_by_time(Traces, Now, [], [], []). + +classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish}; +classify_by_time([Trace = #?TRACE{start_at = Start} | Traces], + Now, Wait, Run, Finish) when Start > Now -> + classify_by_time(Traces, Now, [Trace | Wait], Run, Finish); +classify_by_time([Trace = #?TRACE{end_at = End} | Traces], + Now, Wait, Run, Finish) when End =< Now -> + classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]); +classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> + classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). + +to_trace(TraceList) -> + case to_trace(TraceList, #?TRACE{}) of + {error, Reason} -> {error, Reason}; + {ok, #?TRACE{name = undefined}} -> + {error, "name required"}; + {ok, #?TRACE{type = undefined}} -> + {error, "type required"}; + {ok, #?TRACE{topic = undefined, clientid = undefined}} -> + {error, "topic/clientid cannot be both empty"}; + {ok, Trace} -> + case fill_default(Trace) of + #?TRACE{start_at = Start, end_at = End} when End =< Start -> + {error, "failed by start_at >= end_at"}; + Trace1 -> {ok, Trace1} + end + end. + +fill_default(Trace = #?TRACE{start_at = undefined}) -> + fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); +fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> + fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60}); +fill_default(Trace) -> Trace. + +to_trace([], Rec) -> {ok, Rec}; +to_trace([{<<"name">>, Name} | Trace], Rec) -> + case binary:match(Name, [<<"/">>], []) of + nomatch -> to_trace(Trace, Rec#?TRACE{name = Name}); + _ -> {error, "name cannot contain /"} + end; +to_trace([{<<"type">>, Type} | Trace], Rec) -> + case lists:member(Type, [<<"clientid">>, <<"topic">>]) of + true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); + false -> {error, "incorrect type: only support clientid/topic"} + end; +to_trace([{<<"topic">>, Topic} | Trace], Rec) -> + case validate_topic(Topic) of + ok -> to_trace(Trace, Rec#?TRACE{topic = Topic}); + {error, Reason} -> {error, Reason} + end; +to_trace([{<<"start_at">>, StartAt} | Trace], Rec) -> + case to_system_second(StartAt) of + {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); + {error, Reason} -> {error, Reason} + end; +to_trace([{<<"end_at">>, EndAt} | Trace], Rec) -> + Now = erlang:system_time(second), + case to_system_second(EndAt) of + {ok, Sec} when Sec > Now -> + to_trace(Trace, Rec#?TRACE{end_at = Sec}); + {ok, _Sec} -> + {error, "end_at time has already passed"}; + {error, Reason} -> + {error, Reason} + end; +to_trace([{<<"clientid">>, ClientId} | Trace], Rec) -> + to_trace(Trace, Rec#?TRACE{clientid = ClientId}); +to_trace([{<<"packets">>, PacketList} | Trace], Rec) -> + case to_packets(PacketList) of + {ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets}); + {error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])} + end; +to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. + +validate_topic(TopicName) -> + try emqx_topic:validate(name, TopicName) of + true -> ok + catch + error:Error -> + {error, io_lib:format("~s invalid by ~p", [TopicName, Error])} + end. + +to_system_second(At) -> + try + Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]), + {ok, Sec} + catch error: {badmatch, _} -> + {error, ["The rfc3339 specification not satisfied: ", At]} + end. + +to_packets(Packets) when is_list(Packets) -> + AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets), + case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of + [] -> {ok, AtomTypes}; + InvalidE -> {error, InvalidE} + end; +to_packets(Packets) -> {error, Packets}. + +zip_dir() -> + trace_dir() ++ "zip/". + +trace_dir() -> + filename:join(emqx:get_env(data_dir), "trace") ++ "/". + +log_file(Name, Start) -> + filename:join(trace_dir(), filename(Name, Start)). + +filename(Name, Start) -> + [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading), + lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]). + +transaction(Tran) -> + case mnesia:transaction(Tran) of + {atomic, Res} -> Res; + {aborted, Reason} -> {error, Reason} + end. + +update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel); +update_log_primary_level(_, _) -> set_log_primary_level(debug). + +set_log_primary_level(NewLevel) -> + case NewLevel =/= emqx_logger:get_primary_log_level() of + true -> emqx_logger:set_primary_log_level(NewLevel); + false -> ok + end. diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl new file mode 100644 index 000000000..1bc71654e --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -0,0 +1,147 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_trace_api). +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([ list_trace/2 + , create_trace/2 + , update_trace/2 + , delete_trace/2 + , clear_traces/2 + , download_zip_log/2 + , stream_log_file/2 +]). +-export([read_trace_file/3]). + +-define(TO_BIN(_B_), iolist_to_binary(_B_)). +-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])}). + +list_trace(_, Params) -> + List = + case Params of + [{<<"enable">>, Enable}] -> emqx_trace:list(Enable); + _ -> emqx_trace:list() + end, + {ok, emqx_trace:format(List)}. + +create_trace(_, Param) -> + case emqx_trace:create(Param) of + ok -> ok; + {error, {already_existed, Name}} -> + {error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])}; + {error, {duplicate_condition, Name}} -> + {error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])}; + {error, Reason} -> + {error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)} + end. + +delete_trace(#{name := Name}, _Param) -> + case emqx_trace:delete(Name) of + ok -> ok; + {error, not_found} -> ?NOT_FOUND(Name) + end. + +clear_traces(_, _) -> + emqx_trace:clear(). + +update_trace(#{name := Name, operation := Operation}, _Param) -> + Enable = case Operation of disable -> false; enable -> true end, + case emqx_trace:update(Name, Enable) of + ok -> {ok, #{enable => Enable, name => Name}}; + {error, not_found} -> ?NOT_FOUND(Name) + end. + +%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. +%% cowboy_compress_h will auto encode gzip format. +download_zip_log(#{name := Name}, _Param) -> + case emqx_trace:get_trace_filename(Name) of + {ok, TraceLog} -> + TraceFiles = collect_trace_file(TraceLog), + ZipDir = emqx_trace:zip_dir(), + Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), + ZipFileName = ZipDir ++ TraceLog, + {ok, ZipFile} = zip:zip(ZipFileName, Zips), + emqx_trace:delete_files_after_send(ZipFileName, Zips), + {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; + {error, Reason} -> + {error, Reason} + end. + +group_trace_file(ZipDir, TraceLog, TraceFiles) -> + lists:foldl(fun(Res, Acc) -> + case Res of + {ok, Node, Bin} -> + ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, + ok = file:write_file(ZipName, Bin), + [ZipName | Acc]; + {error, Node, Reason} -> + ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), + Acc + end + end, [], TraceFiles). + +collect_trace_file(TraceLog) -> + Nodes = ekka_mnesia:running_nodes(), + {Files, BadNodes} = rpc:multicall(Nodes, emqx_trace, trace_file, [TraceLog], 60000), + BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), + Files. + +%% _page as position and _limit as bytes for front-end reusing components +stream_log_file(#{name := Name}, Params) -> + Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), + Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>), + Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>), + Node = binary_to_existing_atom(Node0), + Position = binary_to_integer(Position0), + Bytes = binary_to_integer(Bytes0), + case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + {ok, Bin} -> + Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes}, + {ok, #{meta => Meta, items => Bin}}; + eof -> + Meta = #{<<"page">> => Position, <<"limit">> => Bytes}, + {ok, #{meta => Meta, items => <<"">>}}; + {error, Reason} -> + logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), + {error, Reason} + end. + +%% this is an rpc call for stream_log_file/2 +read_trace_file(Name, Position, Limit) -> + TraceDir = emqx_trace:trace_dir(), + {ok, AllFiles} = file:list_dir(TraceDir), + TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_", + Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end, + case lists:filter(Filter, AllFiles) of + [TraceFile] -> + TracePath = filename:join([TraceDir, TraceFile]), + read_file(TracePath, Position, Limit); + [] -> {error, not_found} + end. + +read_file(Path, Offset, Bytes) -> + {ok, IoDevice} = file:open(Path, [read, raw, binary]), + try + _ = case Offset of + 0 -> ok; + _ -> file:position(IoDevice, {bof, Offset}) + end, + file:read(IoDevice, Bytes) + after + file:close(IoDevice) + end. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl new file mode 100644 index 000000000..1bd28d888 --- /dev/null +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -0,0 +1,353 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_SUITE). + +%% API +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-record(emqx_trace, { + name, + type, + topic, + clientid, + packets = [], + enable = true, + start_at, + end_at, + log_size = #{} + }). + +-define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL' + , 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK' + , 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_plugin_libs), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_base_create_delete(_Config) -> + ok = emqx_trace:clear(), + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + End = to_rfc3339(Now + 30 * 60), + Name = <<"name1">>, + ClientId = <<"test-device">>, + Packets = [atom_to_binary(E) || E <- ?PACKETS], + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, + {<<"clientid">>, ClientId}, + {<<"packets">>, Packets}, + {<<"start_at">>, Start}, + {<<"end_at">>, End} + ], + AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}), + ok = emqx_trace:create(Trace), + ?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)), + ?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)), + [TraceRec] = emqx_trace:list(), + Expect = #emqx_trace{ + name = Name, + type = clientid, + topic = undefined, + clientid = ClientId, + packets = ?PACKETS, + start_at = Now, + end_at = Now + 30 * 60 + }, + ?assertEqual(Expect, TraceRec), + ExpectFormat = [ + #{ + clientid => <<"test-device">>, + enable => true, + type => clientid, + packets => ?PACKETS, + name => <<"name1">>, + start_at => Start, + end_at => End, + log_size => #{}, + topic => undefined + } + ], + ?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])), + ?assertEqual(ok, emqx_trace:delete(Name)), + ?assertEqual({error, not_found}, emqx_trace:delete(Name)), + ?assertEqual([], emqx_trace:list()), + ok. + +t_create_size_max(_Config) -> + emqx_trace:clear(), + lists:map(fun(Seq) -> + Name = list_to_binary("name" ++ integer_to_list(Seq)), + Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], + ok = emqx_trace:create(Trace) + end, lists:seq(1, 30)), + Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}], + {error, _} = emqx_trace:create(Trace31), + ok = emqx_trace:delete(<<"name30">>), + ok = emqx_trace:create(Trace31), + ?assertEqual(30, erlang:length(emqx_trace:list())), + ok. + +t_create_failed(_Config) -> + ok = emqx_trace:clear(), + UnknownField = [{<<"unknown">>, 12}], + {error, Reason1} = emqx_trace:create(UnknownField), + ?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)), + + InvalidTopic = [{<<"topic">>, "#/#//"}], + {error, Reason2} = emqx_trace:create(InvalidTopic), + ?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)), + + InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], + {error, Reason3} = emqx_trace:create(InvalidStart), + ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, + iolist_to_binary(Reason3)), + + InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], + {error, Reason4} = emqx_trace:create(InvalidEnd), + ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, + iolist_to_binary(Reason4)), + + InvalidPackets = [{<<"packets">>, [<<"publish">>]}], + {error, Reason5} = emqx_trace:create(InvalidPackets), + ?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)), + + InvalidPackets2 = [{<<"packets">>, <<"publish">>}], + {error, Reason6} = emqx_trace:create(InvalidPackets2), + ?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)), + + {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), + ?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)), + + InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, + {<<"type">>, <<"clientid">>}], + {error, Reason9} = emqx_trace:create(InvalidPackets4), + ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), + + ?assertEqual({error, "type required"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, + {<<"packets">>, []}, {<<"clientid">>, <<"good">>}])), + + ?assertEqual({error, "incorrect type: only support clientid/topic"}, + emqx_trace:create([{<<"name">>, <<"test-name">>}, + {<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), + ok. + +t_create_default(_Config) -> + ok = emqx_trace:clear(), + {error, "name required"} = emqx_trace:create([]), + ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, + {<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]), + [#emqx_trace{packets = Packets}] = emqx_trace:list(), + ?assertEqual([], Packets), + ok = emqx_trace:clear(), + Trace = [ + {<<"name">>, <<"test-name">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, + {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} + ], + {error, "end_at time has already passed"} = emqx_trace:create(Trace), + Now = erlang:system_time(second), + Trace2 = [ + {<<"name">>, <<"test-name">>}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"start_at">>, to_rfc3339(Now + 10)}, + {<<"end_at">>, to_rfc3339(Now + 3)} + ], + {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), + ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]), + [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(), + ?assertEqual(10 * 60, End - Start), + ?assertEqual(true, Start - erlang:system_time(second) < 5), + ok. + +t_update_enable(_Config) -> + ok = emqx_trace:clear(), + Name = <<"test-name">>, + Now = erlang:system_time(second), + End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), + ok = emqx_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]}, + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), + [#emqx_trace{enable = Enable}] = emqx_trace:list(), + ?assertEqual(Enable, true), + ok = emqx_trace:update(Name, false), + [#emqx_trace{enable = false}] = emqx_trace:list(), + ok = emqx_trace:update(Name, false), + [#emqx_trace{enable = false}] = emqx_trace:list(), + ok = emqx_trace:update(Name, true), + [#emqx_trace{enable = true}] = emqx_trace:list(), + ok = emqx_trace:update(Name, false), + [#emqx_trace{enable = false}] = emqx_trace:list(), + ?assertEqual({error, not_found}, emqx_trace:update(<<"Name not found">>, true)), + ct:sleep(2100), + ?assertEqual({error, finished}, emqx_trace:update(Name, true)), + ok. + +t_load_state(_Config) -> + emqx_trace:clear(), + load(), + Now = erlang:system_time(second), + Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, + {<<"end_at">>, to_rfc3339(Now + 2)}], + Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, + {<<"end_at">>, to_rfc3339(Now + 8)}], + Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, + {<<"end_at">>, to_rfc3339(Now)}], + ok = emqx_trace:create(Running), + ok = emqx_trace:create(Waiting), + {error, "end_at time has already passed"} = emqx_trace:create(Finished), + Traces = emqx_trace:format(emqx_trace:list()), + ?assertEqual(2, erlang:length(Traces)), + Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces), + ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}], + ?assertEqual(ExpectEnables, lists:sort(Enables)), + ct:sleep(3500), + Traces2 = emqx_trace:format(emqx_trace:list()), + ?assertEqual(2, erlang:length(Traces2)), + Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), + ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], + ?assertEqual(ExpectEnables2, lists:sort(Enables2)), + unload(), + ok. + +t_client_event(_Config) -> + application:set_env(emqx, allow_anonymous, true), + emqx_trace:clear(), + ClientId = <<"client-test">>, + load(), + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + Name = <<"test_client_id_event">>, + ok = emqx_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + emqtt:ping(Client), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), + ct:sleep(200), + ok = emqx_trace:create([{<<"name">>, <<"test_topic">>}, + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ct:sleep(200), + {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)), + {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)), + ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), + ?assert(erlang:byte_size(Bin) > 0), + ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), + ?assert(erlang:byte_size(Bin3) > 0), + unload(), + ok. + +t_get_log_filename(_Config) -> + ok = emqx_trace:clear(), + load(), + Now = erlang:system_time(second), + Start = calendar:system_time_to_rfc3339(Now), + End = calendar:system_time_to_rfc3339(Now + 2), + Name = <<"name1">>, + ClientId = <<"test-device">>, + Packets = [atom_to_binary(E) || E <- ?PACKETS], + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, + {<<"clientid">>, ClientId}, + {<<"packets">>, Packets}, + {<<"start_at">>, list_to_binary(Start)}, + {<<"end_at">>, list_to_binary(End)} + ], + ok = emqx_trace:create(Trace), + ?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)), + ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), + ct:sleep(3000), + ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), + unload(), + ok. + +t_trace_file(_Config) -> + FileName = "test.log", + Content = <<"test \n test">>, + TraceDir = emqx_trace:trace_dir(), + File = filename:join(TraceDir, FileName), + ok = file:write_file(File, Content), + {ok, Node, Bin} = emqx_trace:trace_file(FileName), + ?assertEqual(Node, atom_to_list(node())), + ?assertEqual(Content, Bin), + ok = file:delete(File), + ok. + +t_download_log(_Config) -> + emqx_trace:clear(), + load(), + ClientId = <<"client-test">>, + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + Name = <<"test_client_id">>, + ok = emqx_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], + ct:sleep(100), + {ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} = + emqx_trace_api:download_zip_log(#{name => Name}, []), + ?assert(ZipFileSize > 0), + ok = emqtt:disconnect(Client), + unload(), + ok. + +to_rfc3339(Second) -> + list_to_binary(calendar:system_time_to_rfc3339(Second)). + +load() -> + emqx_trace:start_link(). + +unload() -> + gen_server:stop(emqx_trace). diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl index 40a978a37..59318a5a1 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl @@ -41,7 +41,8 @@ start_listeners() -> lists:foreach(fun(Listener) -> start_listener(Listener) end, listeners()). -%% Start HTTP Listener + +%% Start HTTP(S) Listener start_listener({Proto, Port, Options}) -> Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}}, {"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}}, @@ -88,7 +89,7 @@ listener_name(Proto) -> http_handlers() -> Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()), [{"/api/v4/", - minirest:handler(#{apps => Plugins ++ [emqx_modules, emqx_plugin_libs], + minirest:handler(#{apps => Plugins ++ [emqx_modules, emqx_plugin_libs], filter => fun ?MODULE:filter/1}), [{authorization, fun ?MODULE:is_authorized/1}]}]. diff --git a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 550f1e9de..71a7692be 100644 --- a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -166,4 +166,3 @@ api_path(Path) -> json(Data) -> {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. - diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace.erl b/lib-ce/emqx_modules/src/emqx_mod_trace.erl index 737ee7e97..03d468a82 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace.erl @@ -16,493 +16,24 @@ -module(emqx_mod_trace). --behaviour(gen_server). -behaviour(emqx_gen_mod). - -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("kernel/include/file.hrl"). - --logger_header("[Trace]"). - -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([ load/1 , unload/1 , description/0 ]). --export([ start_link/0 - , list/0 - , list/1 - , get_trace_filename/1 - , create/1 - , delete/1 - , clear/0 - , update/2 - ]). - --export([ format/1 - , zip_dir/0 - , trace_dir/0 - , trace_file/1 - , delete_files_after_send/2 - ]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(TRACE, ?MODULE). --define(PACKETS, tuple_to_list(?TYPE_NAMES)). --define(MAX_SIZE, 30). - --ifdef(TEST). --export([log_file/2]). --endif. - --record(?TRACE, - { name :: binary() | undefined | '_' - , type :: clientid | topic | undefined | '_' - , topic :: emqx_types:topic() | undefined | '_' - , clientid :: emqx_types:clientid() | undefined | '_' - , packets = [] :: list() | '_' - , enable = true :: boolean() | '_' - , start_at :: integer() | undefined | binary() | '_' - , end_at :: integer() | undefined | binary() | '_' - , log_size = #{} :: map() | '_' - }). - -mnesia(boot) -> - ok = ekka_mnesia:create_table(?TRACE, [ - {type, set}, - {disc_copies, [node()]}, - {record_name, ?TRACE}, - {attributes, record_info(fields, ?TRACE)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TRACE, disc_copies). - +-spec description() -> string(). description() -> "EMQ X Trace Module". -spec load(any()) -> ok. load(_Env) -> - emqx_mod_sup:start_child(?MODULE, worker). + emqx_mod_sup:start_child(emqx_trace, worker). -spec unload(any()) -> ok. unload(_Env) -> - _ = emqx_mod_sup:stop_child(?MODULE), - stop_all_trace_handler(). - --spec(start_link() -> emqx_types:startlink_ret()). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec list() -> [tuple()]. -list() -> - ets:match_object(?TRACE, #?TRACE{_ = '_'}). - --spec list(boolean()) -> [tuple()]. -list(Enable) -> - ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). - --spec create([{Key :: binary(), Value :: binary()}]) -> - ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. -create(Trace) -> - case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of - true -> - case to_trace(Trace) of - {ok, TraceRec} -> create_new_trace(TraceRec); - {error, Reason} -> {error, Reason} - end; - false -> - {error, """The number of traces created has reached the maximum, -please delete the useless ones first"""} - end. - --spec delete(Name :: binary()) -> ok | {error, not_found}. -delete(Name) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name) of - [_] -> mnesia:delete(?TRACE, Name, write); - [] -> mnesia:abort(not_found) - end - end, - transaction(Tran). - --spec clear() -> ok | {error, Reason :: term()}. -clear() -> - case mnesia:clear_table(?TRACE) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end. - --spec update(Name :: binary(), Enable :: boolean()) -> - ok | {error, not_found | finished}. -update(Name, Enable) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name) of - [] -> mnesia:abort(not_found); - [#?TRACE{enable = Enable}] -> ok; - [Rec] -> - case erlang:system_time(second) >= Rec#?TRACE.end_at of - false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); - true -> mnesia:abort(finished) - end - end - end, - transaction(Tran). - --spec get_trace_filename(Name :: binary()) -> - {ok, FileName :: string()} | {error, not_found}. -get_trace_filename(Name) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name, read) of - [] -> mnesia:abort(not_found); - [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)} - end end, - transaction(Tran). - --spec trace_file(File :: list()) -> - {ok, Node :: list(), Binary :: binary()} | - {error, Node :: list(), Reason :: term()}. -trace_file(File) -> - FileName = filename:join(trace_dir(), File), - Node = atom_to_list(node()), - case file:read_file(FileName) of - {ok, Bin} -> {ok, Node, Bin}; - {error, Reason} -> {error, Node, Reason} - end. - -delete_files_after_send(TraceLog, Zips) -> - gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}). - --spec format(list(#?TRACE{})) -> list(map()). -format(Traces) -> - Fields = record_info(fields, ?TRACE), - lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) -> - Trace = Trace0#?TRACE{ - start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)), - end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt)) - }, - [_ | Values] = tuple_to_list(Trace), - maps:from_list(lists:zip(Fields, Values)) - end, Traces). - -init([]) -> - erlang:process_flag(trap_exit, true), - OriginLogLevel = emqx_logger:get_primary_log_level(), - ok = filelib:ensure_dir(trace_dir()), - ok = filelib:ensure_dir(zip_dir()), - {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), - Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), - TRef = update_trace(Traces), - {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ok, State}. - -handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) -> - erlang:monitor(process, Pid), - {noreply, State#{monitors => Monitors#{Pid => Files}}}; -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) -> - case maps:take(Pid, Monitors) of - error -> {noreply, State}; - {Files, NewMonitors} -> - lists:foreach(fun(F) -> file:delete(F) end, Files), - {noreply, State#{monitors => NewMonitors}} - end; -handle_info({timeout, TRef, update_trace}, - #{timer := TRef, primary_log_level := OriginLogLevel} = State) -> - Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), - NextTRef = update_trace(Traces), - {noreply, State#{timer => NextTRef}}; - -handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef), - handle_info({timeout, TRef, update_trace}, State); - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> - ok = set_log_primary_level(OriginLogLevel), - _ = mnesia:unsubscribe({table, ?TRACE, simple}), - emqx_misc:cancel_timer(TRef), - stop_all_trace_handler(), - _ = file:del_dir_r(zip_dir()), - ok. - -code_change(_, State, _Extra) -> - {ok, State}. - -create_new_trace(Trace) -> - Tran = fun() -> - case mnesia:read(?TRACE, Trace#?TRACE.name) of - [] -> - #?TRACE{start_at = StartAt, topic = Topic, - clientid = ClientId, packets = Packets} = Trace, - Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic, - clientid = ClientId, packets = Packets}, - case mnesia:match_object(?TRACE, Match, read) of - [] -> mnesia:write(?TRACE, Trace, write); - [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) - end; - [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name}) - end - end, - transaction(Tran). - -update_trace(Traces) -> - Now = erlang:system_time(second), - {_Waiting, Running, Finished} = classify_by_time(Traces, Now), - disable_finished(Finished), - Started = already_running(), - {NeedRunning, AllStarted} = start_trace(Running, Started), - NeedStop = AllStarted -- NeedRunning, - ok = stop_trace(NeedStop, Started), - clean_stale_trace_files(), - NextTime = find_closest_time(Traces, Now), - emqx_misc:start_timer(NextTime, update_trace). - -stop_all_trace_handler() -> - lists:foreach(fun(#{type := Type, name := Name} = Trace) -> - _ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name) - end - , already_running()). - -already_running() -> - emqx_tracer:lookup_traces(). - -get_enable_trace() -> - {atomic, Traces} = - mnesia:transaction(fun() -> - mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) - end), - Traces. - -find_closest_time(Traces, Now) -> - Sec = - lists:foldl( - fun(#?TRACE{start_at = Start, end_at = End}, Closest) - when Start >= Now andalso Now < End -> %% running - min(End - Now, Closest); - (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting - min(Now - Start, Closest); - (_, Closest) -> Closest %% finished - end, 60 * 15, Traces), - timer:seconds(Sec). - -disable_finished([]) -> ok; -disable_finished(Traces) -> - NameWithLogSize = - lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) -> - FileSize = filelib:file_size(log_file(Name, StartAt)), - {Name, FileSize} end, Traces), - transaction(fun() -> - lists:map(fun({Name, LogSize}) -> - case mnesia:read(?TRACE, Name, write) of - [] -> ok; - [Trace = #?TRACE{log_size = Logs}] -> - mnesia:write(?TRACE, Trace#?TRACE{enable = false, - log_size = Logs#{node() => LogSize}}, write) - end end, NameWithLogSize) - end). - -start_trace(Traces, Started0) -> - Started = lists:map(fun(#{name := Name}) -> Name end, Started0), - lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> - case lists:member(Name, StartedAcc) of - true -> {[Name | Running], StartedAcc}; - false -> - case start_trace(Trace) of - ok -> {[Name | Running], [Name | StartedAcc]}; - Error -> - ?LOG(error, "(~p)start trace failed by:~p", [Name, Error]), - {[Name | Running], StartedAcc} - end - end - end, {[], Started}, Traces). - -start_trace(Trace) -> - #?TRACE{name = Name - , type = Type - , clientid = ClientId - , topic = Topic - , packets = Packets - , start_at = Start - } = Trace, - Who0 = #{name => Name, labels => Packets}, - Who = - case Type of - topic -> Who0#{type => topic, topic => Topic}; - clientid -> Who0#{type => clientid, clientid => ClientId} - end, - case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of - ok -> ok; - {error, {already_exist, _}} -> ok; - {error, Reason} -> {error, Reason} - end. - -stop_trace(Finished, Started) -> - lists:foreach(fun(#{name := Name, type := Type} = Trace) -> - case lists:member(Name, Finished) of - true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name); - false -> ok - end - end, Started). - -clean_stale_trace_files() -> - TraceDir = trace_dir(), - case file:list_dir(TraceDir) of - {ok, AllFiles} when AllFiles =/= ["zip"] -> - FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end, - KeepFiles = lists:map(FileFun, list()), - case AllFiles -- ["zip" | KeepFiles] of - [] -> ok; - DeleteFiles -> - DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end, - lists:foreach(DelFun, DeleteFiles) - end; - _ -> ok - end. - -classify_by_time(Traces, Now) -> - classify_by_time(Traces, Now, [], [], []). - -classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish}; -classify_by_time([Trace = #?TRACE{start_at = Start} | Traces], - Now, Wait, Run, Finish) when Start > Now -> - classify_by_time(Traces, Now, [Trace | Wait], Run, Finish); -classify_by_time([Trace = #?TRACE{end_at = End} | Traces], - Now, Wait, Run, Finish) when End =< Now -> - classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]); -classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> - classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). - -to_trace(TraceList) -> - case to_trace(TraceList, #?TRACE{}) of - {error, Reason} -> {error, Reason}; - {ok, #?TRACE{name = undefined}} -> - {error, "name required"}; - {ok, #?TRACE{type = undefined}} -> - {error, "type required"}; - {ok, #?TRACE{topic = undefined, clientid = undefined}} -> - {error, "topic/clientid cannot be both empty"}; - {ok, Trace} -> - case fill_default(Trace) of - #?TRACE{start_at = Start, end_at = End} when End =< Start -> - {error, "failed by start_at >= end_at"}; - Trace1 -> {ok, Trace1} - end - end. - -fill_default(Trace = #?TRACE{start_at = undefined}) -> - fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); -fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> - fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60}); -fill_default(Trace) -> Trace. - -to_trace([], Rec) -> {ok, Rec}; -to_trace([{<<"name">>, Name} | Trace], Rec) -> - case binary:match(Name, [<<"/">>], []) of - nomatch -> to_trace(Trace, Rec#?TRACE{name = Name}); - _ -> {error, "name cannot contain /"} - end; -to_trace([{<<"type">>, Type} | Trace], Rec) -> - case lists:member(Type, [<<"clientid">>, <<"topic">>]) of - true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); - false -> {error, "incorrect type: only support clientid/topic"} - end; -to_trace([{<<"topic">>, Topic} | Trace], Rec) -> - case validate_topic(Topic) of - ok -> to_trace(Trace, Rec#?TRACE{topic = Topic}); - {error, Reason} -> {error, Reason} - end; -to_trace([{<<"start_at">>, StartAt} | Trace], Rec) -> - case to_system_second(StartAt) of - {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); - {error, Reason} -> {error, Reason} - end; -to_trace([{<<"end_at">>, EndAt} | Trace], Rec) -> - Now = erlang:system_time(second), - case to_system_second(EndAt) of - {ok, Sec} when Sec > Now -> - to_trace(Trace, Rec#?TRACE{end_at = Sec}); - {ok, _Sec} -> - {error, "end_at time has already passed"}; - {error, Reason} -> - {error, Reason} - end; -to_trace([{<<"clientid">>, ClientId} | Trace], Rec) -> - to_trace(Trace, Rec#?TRACE{clientid = ClientId}); -to_trace([{<<"packets">>, PacketList} | Trace], Rec) -> - case to_packets(PacketList) of - {ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets}); - {error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])} - end; -to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. - -validate_topic(TopicName) -> - try emqx_topic:validate(name, TopicName) of - true -> ok - catch - error:Error -> - {error, io_lib:format("~s invalid by ~p", [TopicName, Error])} - end. - -to_system_second(At) -> - try - Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]), - {ok, Sec} - catch error: {badmatch, _} -> - {error, ["The rfc3339 specification not satisfied: ", At]} - end. - -to_packets(Packets) when is_list(Packets) -> - AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets), - case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of - [] -> {ok, AtomTypes}; - InvalidE -> {error, InvalidE} - end; -to_packets(Packets) -> {error, Packets}. - -zip_dir() -> - trace_dir() ++ "zip/". - -trace_dir() -> - filename:join(emqx:get_env(data_dir), "trace") ++ "/". - -log_file(Name, Start) -> - filename:join(trace_dir(), filename(Name, Start)). - -filename(Name, Start) -> - [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading), - lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]). - -transaction(Tran) -> - case mnesia:transaction(Tran) of - {atomic, Res} -> Res; - {aborted, Reason} -> {error, Reason} - end. - -update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel); -update_log_primary_level(_, _) -> set_log_primary_level(debug). - -set_log_primary_level(NewLevel) -> - case NewLevel =/= emqx_logger:get_primary_log_level() of - true -> emqx_logger:set_primary_log_level(NewLevel); - false -> ok - end. + emqx_mod_sup:stop_child(emqx_trace). diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl index 6d2813b96..a3abbedf7 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- -module(emqx_mod_trace_api). --include_lib("emqx/include/logger.hrl"). %% API -export([ list_trace/2 @@ -26,10 +25,6 @@ , download_zip_log/2 , stream_log_file/2 ]). --export([read_trace_file/3]). - --define(TO_BIN(_B_), iolist_to_binary(_B_)). --define(RETURN_NOT_FOUND(N), return({error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])})). -import(minirest, [return/1]). @@ -75,117 +70,26 @@ func => stream_log_file, descr => "download trace's log"}). -list_trace(_, Params) -> - List = - case Params of - [{<<"enable">>, Enable}] -> emqx_mod_trace:list(Enable); - _ -> emqx_mod_trace:list() - end, - return({ok, emqx_mod_trace:format(List)}). +list_trace(Path, Params) -> + return(emqx_trace_api:list_trace(Path, Params)). -create_trace(_, Param) -> - case emqx_mod_trace:create(Param) of - ok -> return(ok); - {error, {already_existed, Name}} -> - return({error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])}); - {error, {duplicate_condition, Name}} -> - return({error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])}); - {error, Reason} -> - return({error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}) +create_trace(Path, Params) -> + return(emqx_trace_api:create_trace(Path, Params)). + +delete_trace(Path, Params) -> + return(emqx_trace_api:delete_trace(Path, Params)). + +clear_traces(Path, Params) -> + return(emqx_trace_api:clear_traces(Path, Params)). + +update_trace(Path, Params) -> + return(emqx_trace_api:update_trace(Path, Params)). + +download_zip_log(Path, Params) -> + case emqx_trace_api:download_zip_log(Path, Params) of + {ok, _Header, _File}= Return -> Return; + {error, _Reason} = Err -> return(Err) end. -delete_trace(#{name := Name}, _Param) -> - case emqx_mod_trace:delete(Name) of - ok -> return(ok); - {error, not_found} -> ?RETURN_NOT_FOUND(Name) - end. - -clear_traces(_, _) -> - return(emqx_mod_trace:clear()). - -update_trace(#{name := Name, operation := Operation}, _Param) -> - Enable = case Operation of disable -> false; enable -> true end, - case emqx_mod_trace:update(Name, Enable) of - ok -> return({ok, #{enable => Enable, name => Name}}); - {error, not_found} -> ?RETURN_NOT_FOUND(Name) - end. - -%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. -%% cowboy_compress_h will auto encode gzip format. -download_zip_log(#{name := Name}, _Param) -> - case emqx_mod_trace:get_trace_filename(Name) of - {ok, TraceLog} -> - TraceFiles = collect_trace_file(TraceLog), - ZipDir = emqx_mod_trace:zip_dir(), - Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), - ZipFileName = ZipDir ++ TraceLog, - {ok, ZipFile} = zip:zip(ZipFileName, Zips), - emqx_mod_trace:delete_files_after_send(ZipFileName, Zips), - {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; - {error, Reason} -> - return({error, Reason}) - end. - -group_trace_file(ZipDir, TraceLog, TraceFiles) -> - lists:foldl(fun(Res, Acc) -> - case Res of - {ok, Node, Bin} -> - ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, - ok = file:write_file(ZipName, Bin), - [ZipName | Acc]; - {error, Node, Reason} -> - ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), - Acc - end - end, [], TraceFiles). - -collect_trace_file(TraceLog) -> - Nodes = ekka_mnesia:running_nodes(), - {Files, BadNodes} = rpc:multicall(Nodes, emqx_mod_trace, trace_file, [TraceLog], 60000), - BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), - Files. - -%% _page as position and _limit as bytes for front-end reusing components -stream_log_file(#{name := Name}, Params) -> - Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), - Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>), - Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>), - Node = binary_to_existing_atom(Node0), - Position = binary_to_integer(Position0), - Bytes = binary_to_integer(Bytes0), - case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of - {ok, Bin} -> - Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes}, - return({ok, #{meta => Meta, items => Bin}}); - eof -> - Meta = #{<<"page">> => Position, <<"limit">> => Bytes}, - return({ok, #{meta => Meta, items => <<"">>}}); - {error, Reason} -> - logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), - return({error, Reason}) - end. - -%% this is an rpc call for stream_log_file/2 -read_trace_file(Name, Position, Limit) -> - TraceDir = emqx_mod_trace:trace_dir(), - {ok, AllFiles} = file:list_dir(TraceDir), - TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_", - Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end, - case lists:filter(Filter, AllFiles) of - [TraceFile] -> - TracePath = filename:join([TraceDir, TraceFile]), - read_file(TracePath, Position, Limit); - [] -> {error, not_found} - end. - -read_file(Path, Offset, Bytes) -> - {ok, IoDevice} = file:open(Path, [read, raw, binary]), - try - _ = case Offset of - 0 -> ok; - _ -> file:position(IoDevice, {bof, Offset}) - end, - file:read(IoDevice, Bytes) - after - file:close(IoDevice) - end. +stream_log_file(Path, Params) -> + return(emqx_trace_api:stream_log_file(Path, Params)). diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl deleted file mode 100644 index c643c7908..000000000 --- a/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl +++ /dev/null @@ -1,478 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_mod_trace_SUITE). - -%% API --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx.hrl"). - --define(HOST, "http://127.0.0.1:18083/"). --define(API_VERSION, "v4"). --define(BASE_PATH, "api"). - --record(emqx_mod_trace, { - name, - type, - topic, - clientid, - packets = [], - enable = true, - start_at, - end_at, - log_size = #{} - }). - --define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL' - , 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK' - , 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']). - -%%-------------------------------------------------------------------- -%% Setups -%%-------------------------------------------------------------------- - -all() -> - emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_modules, emqx_dashboard]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_modules, emqx_dashboard]). - -t_base_create_delete(_Config) -> - ok = emqx_mod_trace:clear(), - Now = erlang:system_time(second), - Start = to_rfc3339(Now), - End = to_rfc3339(Now + 30 * 60), - Name = <<"name1">>, - ClientId = <<"test-device">>, - Packets = [atom_to_binary(E) || E <- ?PACKETS], - Trace = [ - {<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, - {<<"clientid">>, ClientId}, - {<<"packets">>, Packets}, - {<<"start_at">>, Start}, - {<<"end_at">>, End} - ], - AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}), - ok = emqx_mod_trace:create(Trace), - ?assertEqual({error, {already_existed, Name}}, emqx_mod_trace:create(Trace)), - ?assertEqual({error, {duplicate_condition, Name}}, emqx_mod_trace:create(AnotherTrace)), - [TraceRec] = emqx_mod_trace:list(), - Expect = #emqx_mod_trace{ - name = Name, - type = clientid, - topic = undefined, - clientid = ClientId, - packets = ?PACKETS, - start_at = Now, - end_at = Now + 30 * 60 - }, - ?assertEqual(Expect, TraceRec), - ExpectFormat = [ - #{ - clientid => <<"test-device">>, - enable => true, - type => clientid, - packets => ?PACKETS, - name => <<"name1">>, - start_at => Start, - end_at => End, - log_size => #{}, - topic => undefined - } - ], - ?assertEqual(ExpectFormat, emqx_mod_trace:format([TraceRec])), - ?assertEqual(ok, emqx_mod_trace:delete(Name)), - ?assertEqual({error, not_found}, emqx_mod_trace:delete(Name)), - ?assertEqual([], emqx_mod_trace:list()), - ok. - -t_create_size_max(_Config) -> - emqx_mod_trace:clear(), - lists:map(fun(Seq) -> - Name = list_to_binary("name" ++ integer_to_list(Seq)), - Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, - {<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], - ok = emqx_mod_trace:create(Trace) - end, lists:seq(1, 30)), - Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}], - {error, _} = emqx_mod_trace:create(Trace31), - ok = emqx_mod_trace:delete(<<"name30">>), - ok = emqx_mod_trace:create(Trace31), - ?assertEqual(30, erlang:length(emqx_mod_trace:list())), - ok. - -t_create_failed(_Config) -> - ok = emqx_mod_trace:clear(), - UnknownField = [{<<"unknown">>, 12}], - {error, Reason1} = emqx_mod_trace:create(UnknownField), - ?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)), - - InvalidTopic = [{<<"topic">>, "#/#//"}], - {error, Reason2} = emqx_mod_trace:create(InvalidTopic), - ?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)), - - InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], - {error, Reason3} = emqx_mod_trace:create(InvalidStart), - ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, - iolist_to_binary(Reason3)), - - InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], - {error, Reason4} = emqx_mod_trace:create(InvalidEnd), - ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, - iolist_to_binary(Reason4)), - - InvalidPackets = [{<<"packets">>, [<<"publish">>]}], - {error, Reason5} = emqx_mod_trace:create(InvalidPackets), - ?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)), - - InvalidPackets2 = [{<<"packets">>, <<"publish">>}], - {error, Reason6} = emqx_mod_trace:create(InvalidPackets2), - ?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)), - - {error, Reason7} = emqx_mod_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), - ?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)), - - InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}], - {error, Reason9} = emqx_mod_trace:create(InvalidPackets4), - ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), - - ?assertEqual({error, "type required"}, emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, - {<<"packets">>, []}, {<<"clientid">>, <<"good">>}])), - - ?assertEqual({error, "incorrect type: only support clientid/topic"}, - emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, - {<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), - ok. - -t_create_default(_Config) -> - ok = emqx_mod_trace:clear(), - {error, "name required"} = emqx_mod_trace:create([]), - ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]), - [#emqx_mod_trace{packets = Packets}] = emqx_mod_trace:list(), - ?assertEqual([], Packets), - ok = emqx_mod_trace:clear(), - Trace = [ - {<<"name">>, <<"test-name">>}, - {<<"packets">>, [<<"PUBLISH">>]}, - {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/z">>}, - {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, - {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} - ], - {error, "end_at time has already passed"} = emqx_mod_trace:create(Trace), - Now = erlang:system_time(second), - Trace2 = [ - {<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, - {<<"topic">>, <<"/x/y/z">>}, - {<<"start_at">>, to_rfc3339(Now + 10)}, - {<<"end_at">>, to_rfc3339(Now + 3)} - ], - {error, "failed by start_at >= end_at"} = emqx_mod_trace:create(Trace2), - ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]), - [#emqx_mod_trace{start_at = Start, end_at = End}] = emqx_mod_trace:list(), - ?assertEqual(10 * 60, End - Start), - ?assertEqual(true, Start - erlang:system_time(second) < 5), - ok. - -t_update_enable(_Config) -> - ok = emqx_mod_trace:clear(), - Name = <<"test-name">>, - Now = erlang:system_time(second), - End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), - ok = emqx_mod_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), - [#emqx_mod_trace{enable = Enable}] = emqx_mod_trace:list(), - ?assertEqual(Enable, true), - ok = emqx_mod_trace:update(Name, false), - [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), - ok = emqx_mod_trace:update(Name, false), - [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), - ok = emqx_mod_trace:update(Name, true), - [#emqx_mod_trace{enable = true}] = emqx_mod_trace:list(), - ok = emqx_mod_trace:update(Name, false), - [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), - ?assertEqual({error, not_found}, emqx_mod_trace:update(<<"Name not found">>, true)), - ct:sleep(2100), - ?assertEqual({error, finished}, emqx_mod_trace:update(Name, true)), - ok. - -t_load_state(_Config) -> - emqx_mod_trace:clear(), - ok = emqx_mod_trace:load(test), - Now = erlang:system_time(second), - Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, {<<"end_at">>, to_rfc3339(Now + 2)}], - Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}], - Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}], - ok = emqx_mod_trace:create(Running), - ok = emqx_mod_trace:create(Waiting), - {error, "end_at time has already passed"} = emqx_mod_trace:create(Finished), - Traces = emqx_mod_trace:format(emqx_mod_trace:list()), - ?assertEqual(2, erlang:length(Traces)), - Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces), - ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}], - ?assertEqual(ExpectEnables, lists:sort(Enables)), - ct:sleep(3500), - Traces2 = emqx_mod_trace:format(emqx_mod_trace:list()), - ?assertEqual(2, erlang:length(Traces2)), - Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), - ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], - ?assertEqual(ExpectEnables2, lists:sort(Enables2)), - ok = emqx_mod_trace:unload(test), - ok. - -t_client_event(_Config) -> - application:set_env(emqx, allow_anonymous, true), - emqx_mod_trace:clear(), - ClientId = <<"client-test">>, - ok = emqx_mod_trace:load(test), - Now = erlang:system_time(second), - Start = to_rfc3339(Now), - Name = <<"test_client_id_event">>, - ok = emqx_mod_trace:create([{<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(200), - {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(Client), - emqtt:ping(Client), - ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), - ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), - ct:sleep(200), - ok = emqx_mod_trace:create([{<<"name">>, <<"test_topic">>}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), - ct:sleep(200), - {ok, Bin} = file:read_file(emqx_mod_trace:log_file(Name, Now)), - ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), - ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), - ok = emqtt:disconnect(Client), - ct:sleep(200), - {ok, Bin2} = file:read_file(emqx_mod_trace:log_file(Name, Now)), - {ok, Bin3} = file:read_file(emqx_mod_trace:log_file(<<"test_topic">>, Now)), - ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), - ?assert(erlang:byte_size(Bin) > 0), - ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), - ?assert(erlang:byte_size(Bin3) > 0), - ok = emqx_mod_trace:unload(test), - ok. - -t_get_log_filename(_Config) -> - ok = emqx_mod_trace:clear(), - ok = emqx_mod_trace:load(test), - Now = erlang:system_time(second), - Start = calendar:system_time_to_rfc3339(Now), - End = calendar:system_time_to_rfc3339(Now + 2), - Name = <<"name1">>, - ClientId = <<"test-device">>, - Packets = [atom_to_binary(E) || E <- ?PACKETS], - Trace = [ - {<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, - {<<"clientid">>, ClientId}, - {<<"packets">>, Packets}, - {<<"start_at">>, list_to_binary(Start)}, - {<<"end_at">>, list_to_binary(End)} - ], - ok = emqx_mod_trace:create(Trace), - ?assertEqual({error, not_found}, emqx_mod_trace:get_trace_filename(<<"test">>)), - ?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))), - ct:sleep(3000), - ?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))), - ok = emqx_mod_trace:unload(test), - ok. - -t_trace_file(_Config) -> - FileName = "test.log", - Content = <<"test \n test">>, - TraceDir = emqx_mod_trace:trace_dir(), - File = filename:join(TraceDir, FileName), - ok = file:write_file(File, Content), - {ok, Node, Bin} = emqx_mod_trace:trace_file(FileName), - ?assertEqual(Node, atom_to_list(node())), - ?assertEqual(Content, Bin), - ok = file:delete(File), - ok. - -t_http_test(_Config) -> - emqx_mod_trace:clear(), - emqx_mod_trace:load(test), - Header = auth_header_(), - %% list - {ok, Empty} = request_api(get, api_path("trace"), Header), - ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(Empty)), - %% create - ErrorTrace = #{}, - {ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace), - ?assertEqual(#{<<"message">> => <<"unknown field: {}">>, <<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)), - - Name = <<"test-name">>, - Trace = [ - {<<"name">>, Name}, - {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, - {<<"topic">>, <<"/x/y/z">>} - ], - - {ok, Create} = request_api(post, api_path("trace"), Header, Trace), - ?assertEqual(#{<<"code">> => 0}, json(Create)), - - {ok, List} = request_api(get, api_path("trace"), Header), - #{<<"code">> := 0, <<"data">> := [Data]} = json(List), - ?assertEqual(Name, maps:get(<<"name">>, Data)), - - %% update - {ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}), - ?assertEqual(#{<<"code">> => 0, - <<"data">> => #{<<"enable">> => false, - <<"name">> => <<"test-name">>}}, json(Update)), - - {ok, List1} = request_api(get, api_path("trace"), Header), - #{<<"code">> := 0, <<"data">> := [Data1]} = json(List1), - ?assertEqual(false, maps:get(<<"enable">>, Data1)), - - %% delete - {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), - ?assertEqual(#{<<"code">> => 0}, json(Delete)), - - {ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header), - ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>, - <<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)), - - {ok, List2} = request_api(get, api_path("trace"), Header), - ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)), - - %% clear - {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), - ?assertEqual(#{<<"code">> => 0}, json(Create1)), - - {ok, Clear} = request_api(delete, api_path("trace"), Header), - ?assertEqual(#{<<"code">> => 0}, json(Clear)), - - emqx_mod_trace:unload(test), - ok. - -t_download_log(_Config) -> - emqx_mod_trace:clear(), - emqx_mod_trace:load(test), - ClientId = <<"client-test">>, - Now = erlang:system_time(second), - Start = to_rfc3339(Now), - Name = <<"test_client_id">>, - ok = emqx_mod_trace:create([{<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(Client), - [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], - ct:sleep(100), - {ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} = - emqx_mod_trace_api:download_zip_log(#{name => Name}, []), - ?assert(ZipFileSize > 0), - %% download zip file failed by server_closed occasionally? - %Header = auth_header_(), - %{ok, ZipBin} = request_api(get, api_path("trace/test_client_id/download"), Header), - %{ok, ZipHandler} = zip:zip_open(ZipBin), - %{ok, [ZipName]} = zip:zip_get(ZipHandler), - %?assertNotEqual(nomatch, string:find(ZipName, "test@127.0.0.1")), - %{ok, _} = file:read_file(emqx_mod_trace:log_file(<<"test_client_id">>, Now)), - ok = emqtt:disconnect(Client), - emqx_mod_trace:unload(test), - ok. - -t_stream_log(_Config) -> - application:set_env(emqx, allow_anonymous, true), - emqx_mod_trace:clear(), - emqx_mod_trace:load(test), - ClientId = <<"client-stream">>, - Now = erlang:system_time(second), - Name = <<"test_stream_log">>, - Start = to_rfc3339(Now - 10), - ok = emqx_mod_trace:create([{<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(200), - {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(Client), - [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], - emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]), - emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]), - ok = emqtt:disconnect(Client), - ct:sleep(200), - File = emqx_mod_trace:log_file(Name, Now), - ct:pal("FileName: ~p", [File]), - {ok, FileBin} = file:read_file(File), - ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), - Header = auth_header_(), - {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header), - #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary), - ?assertEqual(10, byte_size(Bin)), - ?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta), - {ok, Binary1} = request_api(get, api_path("trace/test_stream_log/log?_page=20&_limit=10"), Header), - #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1), - ?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1), - ?assertEqual(10, byte_size(Bin1)), - emqx_mod_trace:unload(test), - ok. - -to_rfc3339(Second) -> - list_to_binary(calendar:system_time_to_rfc3339(Second)). - -auth_header_() -> - auth_header_("admin", "public"). - -auth_header_(User, Pass) -> - Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), - {"Authorization", "Basic " ++ Encoded}. - -request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). - -request_api(Method, Url, Auth, Body) -> - Request = {Url, [Auth], "application/json", emqx_json:encode(Body)}, - do_request_api(Method, Request). - -do_request_api(Method, Request) -> - ct:pal("Method: ~p, Request: ~p", [Method, Request]), - case httpc:request(Method, Request, [], [{body_format, binary}]) of - {error, socket_closed_remotely} -> - {error, socket_closed_remotely}; - {error,{shutdown, server_closed}} -> - {error, server_closed}; - {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } - when Code =:= 200 orelse Code =:= 201 -> - {ok, Return}; - {ok, {Reason, _, _}} -> - {error, Reason} - end. - -api_path(Path) -> - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]). - -json(Data) -> - {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl new file mode 100644 index 000000000..609a2d93c --- /dev/null +++ b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl @@ -0,0 +1,179 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mod_trace_api_SUITE). + +%% API +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-define(HOST, "http://127.0.0.1:18083/"). +-define(API_VERSION, "v4"). +-define(BASE_PATH, "api"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_plugin_libs), + emqx_ct_helpers:start_apps([emqx_modules, emqx_dashboard]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([emqx_modules, emqx_dashboard]). + +t_http_test(_Config) -> + emqx_trace:clear(), + load(), + Header = auth_header_(), + %% list + {ok, Empty} = request_api(get, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(Empty)), + %% create + ErrorTrace = #{}, + {ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace), + ?assertEqual(#{<<"message">> => <<"unknown field: {}">>, + <<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)), + + Name = <<"test-name">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, <<"/x/y/z">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(#{<<"code">> => 0}, json(Create)), + + {ok, List} = request_api(get, api_path("trace"), Header), + #{<<"code">> := 0, <<"data">> := [Data]} = json(List), + ?assertEqual(Name, maps:get(<<"name">>, Data)), + + %% update + {ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}), + ?assertEqual(#{<<"code">> => 0, + <<"data">> => #{<<"enable">> => false, + <<"name">> => <<"test-name">>}}, json(Update)), + + {ok, List1} = request_api(get, api_path("trace"), Header), + #{<<"code">> := 0, <<"data">> := [Data1]} = json(List1), + ?assertEqual(false, maps:get(<<"enable">>, Data1)), + + %% delete + {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(#{<<"code">> => 0}, json(Delete)), + + {ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>, + <<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)), + + {ok, List2} = request_api(get, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)), + + %% clear + {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(#{<<"code">> => 0}, json(Create1)), + + {ok, Clear} = request_api(delete, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0}, json(Clear)), + + unload(), + ok. + +t_stream_log(_Config) -> + application:set_env(emqx, allow_anonymous, true), + emqx_trace:clear(), + load(), + ClientId = <<"client-stream">>, + Now = erlang:system_time(second), + Name = <<"test_stream_log">>, + Start = to_rfc3339(Now - 10), + ok = emqx_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]), + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ct:sleep(200), + File = emqx_trace:log_file(Name, Now), + ct:pal("FileName: ~p", [File]), + {ok, FileBin} = file:read_file(File), + ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), + Header = auth_header_(), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header), + #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary), + ?assertEqual(10, byte_size(Bin)), + ?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta), + Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"), + {ok, Binary1} = request_api(get, Path, Header), + #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1), + ?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1), + ?assertEqual(10, byte_size(Bin1)), + unload(), + ok. + +to_rfc3339(Second) -> + list_to_binary(calendar:system_time_to_rfc3339(Second)). + +auth_header_() -> + auth_header_("admin", "public"). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), + {"Authorization", "Basic " ++ Encoded}. + +request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). + +request_api(Method, Url, Auth, Body) -> + Request = {Url, [Auth], "application/json", emqx_json:encode(Body)}, + do_request_api(Method, Request). + +do_request_api(Method, Request) -> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {error,{shutdown, server_closed}} -> + {error, server_closed}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } + when Code =:= 200 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +api_path(Path) -> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]). + +json(Data) -> + {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. + +load() -> + emqx_trace:start_link(). + +unload() -> + gen_server:stop(emqx_trace).