%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_api_trace). -behaviour(minirest_api). -include_lib("kernel/include/file.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ api_spec/0 , fields/1 , paths/0 , schema/1 , namespace/0 ]). -export([ trace/2 , delete_trace/2 , update_trace/2 , download_trace_log/2 , stream_log_file/2 ]). -export([validate_name/1]). %% for rpc -export([ read_trace_file/3 , get_trace_size/0 ]). -define(TO_BIN(_B_), iolist_to_binary(_B_)). -define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}). namespace() -> "trace". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). paths() -> ["/trace", "/trace/:name/stop", "/trace/:name/download", "/trace/:name/log", "/trace/:name"]. schema("/trace") -> #{ 'operationId' => trace, get => #{ description => "List all trace", responses => #{ 200 => hoconsc:ref(trace) } }, post => #{ description => "Create new trace", 'requestBody' => delete([status, log_size], fields(trace)), responses => #{ 200 => hoconsc:ref(trace) } }, delete => #{ description => "Clear all traces", responses => #{ 204 => <<"No Content">> } } }; schema("/trace/:name") -> #{ 'operationId' => delete_trace, delete => #{ description => "Delete trace by name", parameters => [hoconsc:ref(name)], responses => #{ 204 => <<"Delete successfully">>, 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>) } } }; schema("/trace/:name/stop") -> #{ 'operationId' => update_trace, put => #{ description => "Stop trace by name", parameters => [hoconsc:ref(name)], responses => #{ 200 => hoconsc:ref(trace), 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>) } } }; schema("/trace/:name/download") -> #{ 'operationId' => download_trace_log, get => #{ description => "Download trace log by name", parameters => [hoconsc:ref(name)], responses => #{ 200 => #{description => "A trace zip file", content => #{ 'application/octet-stream' => #{schema => #{type => "string", format => "binary"}} } } } } }; schema("/trace/:name/log") -> #{ 'operationId' => stream_log_file, get => #{ description => "view trace log", parameters => [ hoconsc:ref(name), hoconsc:ref(bytes), hoconsc:ref(position), hoconsc:ref(node) ], responses => #{ 200 => [ {items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})} | fields(bytes) ++ fields(position) ] } } }. fields(trace) -> [ {name, hoconsc:mk(binary(), #{desc => "Unique and format by [a-zA-Z0-9-_]", validator => fun ?MODULE:validate_name/1, nullable => false, example => <<"EMQX-TRACE-1">>})}, {type, hoconsc:mk(hoconsc:enum([clientid, topic, ip_address]), #{desc => """Filter type""", nullable => false, example => <<"clientid">>})}, {topic, hoconsc:mk(binary(), #{desc => """support mqtt wildcard topic.""", nullable => true, example => <<"/dev/#">>})}, {clientid, hoconsc:mk(binary(), #{desc => """mqtt clientid.""", nullable => true, example => <<"dev-001">>})}, %% TODO add ip_address type in emqx_schema.erl {ip_address, hoconsc:mk(binary(), #{desc => "client ip address", nullable => true, example => <<"127.0.0.1">> })}, {status, hoconsc:mk(hoconsc:enum([running, stopped, waiting]), #{desc => "trace status", nullable => true, example => running })}, {start_at, hoconsc:mk(binary(), #{desc => "rfc3339 timestamp", nullable => true, example => <<"2021-11-04T18:17:38+08:00">> })}, {end_at, hoconsc:mk(binary(), #{desc => "rfc3339 timestamp", nullable => true, example => <<"2021-11-05T18:17:38+08:00">> })}, {log_size, hoconsc:mk(hoconsc:array(map()), #{desc => "trace log size", example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], nullable => true})} ]; fields(name) -> [{name, hoconsc:mk(binary(), #{ desc => <<"[a-zA-Z0-9-_]">>, example => <<"EMQX-TRACE-1">>, in => path, validator => fun ?MODULE:validate_name/1 })} ]; fields(node) -> [{node, hoconsc:mk(binary(), #{ desc => "Node name", in => query, nullable => true })}]; fields(bytes) -> [{bytes, hoconsc:mk(integer(), #{ desc => "Maximum number of bytes to store in request", in => query, nullable => true, default => 1000 })}]; fields(position) -> [{position, hoconsc:mk(integer(), #{ desc => "Offset from the current trace position.", in => query, nullable => true, default => 0 })}]. -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). validate_name(Name) -> NameLen = byte_size(Name), case NameLen > 0 andalso NameLen =< 256 of true -> case re:run(Name, ?NAME_RE) of nomatch -> {error, "Name should be " ?NAME_RE}; _ -> ok end; false -> {error, "Name Length must =< 256"} end. delete(Keys, Fields) -> lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys). trace(get, _Params) -> case emqx_trace:list() of [] -> {200, []}; List0 -> List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, emqx_trace:format(List0)), Nodes = mria_mnesia:running_nodes(), TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v1:get_trace_size(Nodes)), AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), Now = erlang:system_time(second), Traces = lists:map(fun(Trace = #{name := Name, start_at := Start, end_at := End, enable := Enable, type := Type, filter := Filter}) -> FileName = emqx_trace:filename(Name, Start), LogSize = collect_file_size(Nodes, FileName, AllFileSize), Trace0 = maps:without([enable, filter], Trace), Trace0#{log_size => LogSize , Type => iolist_to_binary(Filter) , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) , status => status(Enable, Start, End, Now) } end, List), {200, Traces} end; trace(post, #{body := Param}) -> case emqx_trace:create(Param) of {ok, Trace0} -> {200, format_trace(Trace0)}; {error, {already_existed, Name}} -> {400, #{ code => 'ALREADY_EXISTED', message => ?TO_BIN([Name, " Already Exists"]) }}; {error, {duplicate_condition, Name}} -> {400, #{ code => 'DUPLICATE_CONDITION', message => ?TO_BIN([Name, " Duplication Condition"]) }}; {error, Reason} -> {400, #{ code => 'INCORRECT_PARAMS', message => ?TO_BIN(Reason) }} end; trace(delete, _Param) -> ok = emqx_trace:clear(), {204}. format_trace(Trace0) -> [ #{start_at := Start, end_at := End, enable := Enable, type := Type, filter := Filter} = Trace1 ] = emqx_trace:format([Trace0]), Now = erlang:system_time(second), LogSize = lists:foldl(fun(Node, Acc) -> Acc#{Node => 0} end, #{}, mria_mnesia:running_nodes()), Trace2 = maps:without([enable, filter], Trace1), Trace2#{log_size => LogSize , Type => iolist_to_binary(Filter) , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) , status => status(Enable, Start, End, Now) }. delete_trace(delete, #{bindings := #{name := Name}}) -> case emqx_trace:delete(Name) of ok -> {204}; {error, not_found} -> ?NOT_FOUND(Name) end. update_trace(put, #{bindings := #{name := Name}}) -> case emqx_trace:update(Name, false) of ok -> {200, #{enable => false, name => Name}}; {error, not_found} -> ?NOT_FOUND(Name) end. %% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. %% cowboy_compress_h will auto encode gzip format. download_trace_log(get, #{bindings := #{name := Name}}) -> case emqx_trace:get_trace_filename(Name) of {ok, TraceLog} -> TraceFiles = collect_trace_file(TraceLog), ZipDir = emqx_trace:zip_dir(), Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), %% emqx_trace:delete_files_after_send(ZipFileName, Zips), %% TODO use file replace file_binary.(delete file after send is not ready now). {ok, Binary} = file:read_file(ZipFile), ZipName = filename:basename(ZipFile), _ = file:delete(ZipFile), Headers = #{ <<"content-type">> => <<"application/x-zip">>, <<"content-disposition">> => iolist_to_binary("attachment; filename=" ++ ZipName) }, {200, Headers, {file_binary, ZipName, Binary}}; {error, not_found} -> ?NOT_FOUND(Name) end. group_trace_file(ZipDir, TraceLog, TraceFiles) -> lists:foldl(fun(Res, Acc) -> case Res of {ok, Node, Bin} -> ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, case file:write_file(ZipName, Bin) of ok -> [Node ++ "-" ++ TraceLog | Acc]; _ -> Acc end; {error, Node, Reason} -> ?SLOG(error, #{msg => "download_trace_log_error", node => Node, log => TraceLog, reason => Reason}), Acc end end, [], TraceFiles). collect_trace_file(TraceLog) -> Nodes = mria_mnesia:running_nodes(), wrap_rpc(emqx_mgmt_trace_proto_v1:trace_file(Nodes, TraceLog)). wrap_rpc({GoodRes, BadNodes}) -> BadNodes =/= [] andalso ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}), GoodRes. stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())), Position = maps:get(<<"position">>, Query, 0), Bytes = maps:get(<<"bytes">>, Query, 1000), case to_node(Node0) of {ok, Node} -> case emqx_mgmt_trace_proto_v1:read_trace_file(Node, Name, Position, Bytes) of {ok, Bin} -> Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, {200, #{meta => Meta, items => Bin}}; {eof, Size} -> Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, {200, #{meta => Meta, items => <<"">>}}; {error, enoent} -> %% the waiting trace should return "" not error. Meta = #{<<"position">> => Position, <<"bytes">> => Bytes}, {200, #{meta => Meta, items => <<"">>}}; {error, Reason} -> ?SLOG(error, #{msg => "read_file_failed", node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}), {400, #{code => 'READ_FILE_ERROR', message => Reason}}; {badrpc, nodedown} -> {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}} end; {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}} end. -spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}. get_trace_size() -> TraceDir = emqx_trace:trace_dir(), Node = node(), case file:list_dir(TraceDir) of {ok, AllFiles} -> lists:foldl(fun(File, Acc) -> FullFileName = filename:join(TraceDir, File), Acc#{{Node, File} => filelib:file_size(FullFileName)} end, #{}, lists:delete("zip", AllFiles)); _ -> #{} end. %% this is an rpc call for stream_log_file/2 -spec read_trace_file( binary() , non_neg_integer() , non_neg_integer() ) -> {ok, binary()} | {error, _} | {eof, non_neg_integer()}. read_trace_file(Name, Position, Limit) -> case emqx_trace:get_trace_filename(Name) of {error, _} = Error -> Error; {ok, TraceFile} -> TraceDir = emqx_trace:trace_dir(), TracePath = filename:join([TraceDir, TraceFile]), read_file(TracePath, Position, Limit) end. read_file(Path, Offset, Bytes) -> case file:open(Path, [read, raw, binary]) of {ok, IoDevice} -> try _ = case Offset of 0 -> ok; _ -> file:position(IoDevice, {bof, Offset}) end, case file:read(IoDevice, Bytes) of {ok, Bin} -> {ok, Bin}; {error, Reason} -> {error, Reason}; eof -> {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), {eof, Size} end after file:close(IoDevice) end; {error, Reason} -> {error, Reason} end. to_node(Node) -> try {ok, binary_to_existing_atom(Node)} catch _:_ -> {error, not_found} end. collect_file_size(Nodes, FileName, AllFiles) -> lists:foldl(fun(Node, Acc) -> Size = maps:get({Node, FileName}, AllFiles, 0), Acc#{Node => Size} end, #{}, Nodes). status(false, _Start, _End, _Now) -> <<"stopped">>; status(true, Start, _End, Now) when Now < Start -> <<"waiting">>; status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; status(true, _Start, _End, _Now) -> <<"running">>.