452 lines
16 KiB
Erlang
452 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_mgmt_api_trace).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("kernel/include/file.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([ api_spec/0
|
|
, fields/1
|
|
, paths/0
|
|
, schema/1
|
|
, namespace/0
|
|
]).
|
|
|
|
-export([ trace/2
|
|
, delete_trace/2
|
|
, update_trace/2
|
|
, download_trace_log/2
|
|
, stream_log_file/2
|
|
]).
|
|
|
|
-export([validate_name/1]).
|
|
|
|
%% for rpc
|
|
-export([ read_trace_file/3
|
|
, get_trace_size/0
|
|
]).
|
|
|
|
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
|
-define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}).
|
|
|
|
namespace() -> "trace".
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
|
|
|
paths() ->
|
|
["/trace", "/trace/:name/stop", "/trace/:name/download", "/trace/:name/log", "/trace/:name"].
|
|
|
|
|
|
schema("/trace") ->
|
|
#{
|
|
'operationId' => trace,
|
|
get => #{
|
|
description => "List all trace",
|
|
responses => #{
|
|
200 => hoconsc:ref(trace)
|
|
}
|
|
},
|
|
post => #{
|
|
description => "Create new trace",
|
|
'requestBody' => delete([status, log_size], fields(trace)),
|
|
responses => #{
|
|
200 => hoconsc:ref(trace)
|
|
}
|
|
},
|
|
delete => #{
|
|
description => "Clear all traces",
|
|
responses => #{
|
|
204 => <<"No Content">>
|
|
}
|
|
}
|
|
};
|
|
schema("/trace/:name") ->
|
|
#{
|
|
'operationId' => delete_trace,
|
|
delete => #{
|
|
description => "Delete trace by name",
|
|
parameters => [hoconsc:ref(name)],
|
|
responses => #{
|
|
204 => <<"Delete successfully">>,
|
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/trace/:name/stop") ->
|
|
#{
|
|
'operationId' => update_trace,
|
|
put => #{
|
|
description => "Stop trace by name",
|
|
parameters => [hoconsc:ref(name)],
|
|
responses => #{
|
|
200 => hoconsc:ref(trace),
|
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/trace/:name/download") ->
|
|
#{
|
|
'operationId' => download_trace_log,
|
|
get => #{
|
|
description => "Download trace log by name",
|
|
parameters => [hoconsc:ref(name)],
|
|
responses => #{
|
|
200 =>
|
|
#{description => "A trace zip file",
|
|
content => #{
|
|
'application/octet-stream' =>
|
|
#{schema => #{type => "string", format => "binary"}}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
schema("/trace/:name/log") ->
|
|
#{
|
|
'operationId' => stream_log_file,
|
|
get => #{
|
|
description => "view trace log",
|
|
parameters => [
|
|
hoconsc:ref(name),
|
|
hoconsc:ref(bytes),
|
|
hoconsc:ref(position),
|
|
hoconsc:ref(node)
|
|
],
|
|
responses => #{
|
|
200 =>
|
|
[
|
|
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})}
|
|
| fields(bytes) ++ fields(position)
|
|
]
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields(trace) ->
|
|
[
|
|
{name, hoconsc:mk(binary(),
|
|
#{desc => "Unique and format by [a-zA-Z0-9-_]",
|
|
validator => fun ?MODULE:validate_name/1,
|
|
nullable => false,
|
|
example => <<"EMQX-TRACE-1">>})},
|
|
{type, hoconsc:mk(hoconsc:enum([clientid, topic, ip_address]),
|
|
#{desc => """Filter type""",
|
|
nullable => false,
|
|
example => <<"clientid">>})},
|
|
{topic, hoconsc:mk(binary(),
|
|
#{desc => """support mqtt wildcard topic.""",
|
|
nullable => true,
|
|
example => <<"/dev/#">>})},
|
|
{clientid, hoconsc:mk(binary(),
|
|
#{desc => """mqtt clientid.""",
|
|
nullable => true,
|
|
example => <<"dev-001">>})},
|
|
%% TODO add ip_address type in emqx_schema.erl
|
|
{ip_address, hoconsc:mk(binary(),
|
|
#{desc => "client ip address",
|
|
nullable => true,
|
|
example => <<"127.0.0.1">>
|
|
})},
|
|
{status, hoconsc:mk(hoconsc:enum([running, stopped, waiting]),
|
|
#{desc => "trace status",
|
|
nullable => true,
|
|
example => running
|
|
})},
|
|
{start_at, hoconsc:mk(binary(),
|
|
#{desc => "rfc3339 timestamp",
|
|
nullable => true,
|
|
example => <<"2021-11-04T18:17:38+08:00">>
|
|
})},
|
|
{end_at, hoconsc:mk(binary(),
|
|
#{desc => "rfc3339 timestamp",
|
|
nullable => true,
|
|
example => <<"2021-11-05T18:17:38+08:00">>
|
|
})},
|
|
{log_size, hoconsc:mk(hoconsc:array(map()),
|
|
#{desc => "trace log size",
|
|
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
|
|
nullable => true})}
|
|
];
|
|
fields(name) ->
|
|
[{name, hoconsc:mk(binary(),
|
|
#{
|
|
desc => <<"[a-zA-Z0-9-_]">>,
|
|
example => <<"EMQX-TRACE-1">>,
|
|
in => path,
|
|
validator => fun ?MODULE:validate_name/1
|
|
})}
|
|
];
|
|
fields(node) ->
|
|
[{node, hoconsc:mk(binary(),
|
|
#{
|
|
desc => "Node name",
|
|
in => query,
|
|
nullable => true
|
|
})}];
|
|
fields(bytes) ->
|
|
[{bytes, hoconsc:mk(integer(),
|
|
#{
|
|
desc => "Maximum number of bytes to store in request",
|
|
in => query,
|
|
nullable => true,
|
|
default => 1000
|
|
})}];
|
|
fields(position) ->
|
|
[{position, hoconsc:mk(integer(),
|
|
#{
|
|
desc => "Offset from the current trace position.",
|
|
in => query,
|
|
nullable => true,
|
|
default => 0
|
|
})}].
|
|
|
|
|
|
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
|
|
|
|
validate_name(Name) ->
|
|
NameLen = byte_size(Name),
|
|
case NameLen > 0 andalso NameLen =< 256 of
|
|
true ->
|
|
case re:run(Name, ?NAME_RE) of
|
|
nomatch -> {error, "Name should be " ?NAME_RE};
|
|
_ -> ok
|
|
end;
|
|
false -> {error, "Name Length must =< 256"}
|
|
end.
|
|
|
|
delete(Keys, Fields) ->
|
|
lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
|
|
|
|
trace(get, _Params) ->
|
|
case emqx_trace:list() of
|
|
[] -> {200, []};
|
|
List0 ->
|
|
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
|
|
emqx_trace:format(List0)),
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v1:get_trace_size(Nodes)),
|
|
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
|
|
Now = erlang:system_time(second),
|
|
Traces =
|
|
lists:map(fun(Trace = #{name := Name, start_at := Start,
|
|
end_at := End, enable := Enable, type := Type, filter := Filter}) ->
|
|
FileName = emqx_trace:filename(Name, Start),
|
|
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
|
|
Trace0 = maps:without([enable, filter], Trace),
|
|
Trace0#{log_size => LogSize
|
|
, Type => iolist_to_binary(Filter)
|
|
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
|
|
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
|
|
, status => status(Enable, Start, End, Now)
|
|
}
|
|
end, List),
|
|
{200, Traces}
|
|
end;
|
|
trace(post, #{body := Param}) ->
|
|
case emqx_trace:create(Param) of
|
|
{ok, Trace0} -> {200, format_trace(Trace0)};
|
|
{error, {already_existed, Name}} ->
|
|
{400, #{
|
|
code => 'ALREADY_EXISTED',
|
|
message => ?TO_BIN([Name, " Already Exists"])
|
|
}};
|
|
{error, {duplicate_condition, Name}} ->
|
|
{400, #{
|
|
code => 'DUPLICATE_CONDITION',
|
|
message => ?TO_BIN([Name, " Duplication Condition"])
|
|
}};
|
|
{error, Reason} ->
|
|
{400, #{
|
|
code => 'INCORRECT_PARAMS',
|
|
message => ?TO_BIN(Reason)
|
|
}}
|
|
end;
|
|
trace(delete, _Param) ->
|
|
ok = emqx_trace:clear(),
|
|
{204}.
|
|
|
|
format_trace(Trace0) ->
|
|
[
|
|
#{start_at := Start, end_at := End,
|
|
enable := Enable, type := Type, filter := Filter} = Trace1
|
|
] = emqx_trace:format([Trace0]),
|
|
Now = erlang:system_time(second),
|
|
LogSize = lists:foldl(fun(Node, Acc) -> Acc#{Node => 0} end, #{},
|
|
mria_mnesia:running_nodes()),
|
|
Trace2 = maps:without([enable, filter], Trace1),
|
|
Trace2#{log_size => LogSize
|
|
, Type => iolist_to_binary(Filter)
|
|
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
|
|
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
|
|
, status => status(Enable, Start, End, Now)
|
|
}.
|
|
|
|
delete_trace(delete, #{bindings := #{name := Name}}) ->
|
|
case emqx_trace:delete(Name) of
|
|
ok -> {204};
|
|
{error, not_found} -> ?NOT_FOUND(Name)
|
|
end.
|
|
|
|
update_trace(put, #{bindings := #{name := Name}}) ->
|
|
case emqx_trace:update(Name, false) of
|
|
ok -> {200, #{enable => false, name => Name}};
|
|
{error, not_found} -> ?NOT_FOUND(Name)
|
|
end.
|
|
|
|
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
|
%% cowboy_compress_h will auto encode gzip format.
|
|
download_trace_log(get, #{bindings := #{name := Name}}) ->
|
|
case emqx_trace:get_trace_filename(Name) of
|
|
{ok, TraceLog} ->
|
|
TraceFiles = collect_trace_file(TraceLog),
|
|
ZipDir = emqx_trace:zip_dir(),
|
|
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
|
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
|
|
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
|
|
%% emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
|
%% TODO use file replace file_binary.(delete file after send is not ready now).
|
|
{ok, Binary} = file:read_file(ZipFile),
|
|
ZipName = filename:basename(ZipFile),
|
|
_ = file:delete(ZipFile),
|
|
Headers = #{
|
|
<<"content-type">> => <<"application/x-zip">>,
|
|
<<"content-disposition">> => iolist_to_binary("attachment; filename=" ++ ZipName)
|
|
},
|
|
{200, Headers, {file_binary, ZipName, Binary}};
|
|
{error, not_found} -> ?NOT_FOUND(Name)
|
|
end.
|
|
|
|
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
|
lists:foldl(fun(Res, Acc) ->
|
|
case Res of
|
|
{ok, Node, Bin} ->
|
|
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
|
case file:write_file(ZipName, Bin) of
|
|
ok -> [Node ++ "-" ++ TraceLog | Acc];
|
|
_ -> Acc
|
|
end;
|
|
{error, Node, Reason} ->
|
|
?SLOG(error, #{msg => "download_trace_log_error", node => Node, log => TraceLog, reason => Reason}),
|
|
Acc
|
|
end
|
|
end, [], TraceFiles).
|
|
|
|
collect_trace_file(TraceLog) ->
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
wrap_rpc(emqx_mgmt_trace_proto_v1:trace_file(Nodes, TraceLog)).
|
|
|
|
wrap_rpc({GoodRes, BadNodes}) ->
|
|
BadNodes =/= [] andalso
|
|
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
|
|
GoodRes.
|
|
|
|
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
|
|
Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())),
|
|
Position = maps:get(<<"position">>, Query, 0),
|
|
Bytes = maps:get(<<"bytes">>, Query, 1000),
|
|
case to_node(Node0) of
|
|
{ok, Node} ->
|
|
case emqx_mgmt_trace_proto_v1:read_trace_file(Node, Name, Position, Bytes) of
|
|
{ok, Bin} ->
|
|
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
|
{200, #{meta => Meta, items => Bin}};
|
|
{eof, Size} ->
|
|
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
|
{200, #{meta => Meta, items => <<"">>}};
|
|
{error, enoent} -> %% the waiting trace should return "" not error.
|
|
Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
|
|
{200, #{meta => Meta, items => <<"">>}};
|
|
{error, Reason} ->
|
|
?SLOG(error, #{msg => "read_file_failed",
|
|
node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}),
|
|
{400, #{code => 'READ_FILE_ERROR', message => Reason}};
|
|
{badrpc, nodedown} ->
|
|
{400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}
|
|
end;
|
|
{error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
|
|
end.
|
|
|
|
-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
|
|
get_trace_size() ->
|
|
TraceDir = emqx_trace:trace_dir(),
|
|
Node = node(),
|
|
case file:list_dir(TraceDir) of
|
|
{ok, AllFiles} ->
|
|
lists:foldl(fun(File, Acc) ->
|
|
FullFileName = filename:join(TraceDir, File),
|
|
Acc#{{Node, File} => filelib:file_size(FullFileName)}
|
|
end, #{}, lists:delete("zip", AllFiles));
|
|
_ -> #{}
|
|
end.
|
|
|
|
%% this is an rpc call for stream_log_file/2
|
|
-spec read_trace_file( binary()
|
|
, non_neg_integer()
|
|
, non_neg_integer()
|
|
) -> {ok, binary()}
|
|
| {error, _}
|
|
| {eof, non_neg_integer()}.
|
|
read_trace_file(Name, Position, Limit) ->
|
|
case emqx_trace:get_trace_filename(Name) of
|
|
{error, _} = Error -> Error;
|
|
{ok, TraceFile} ->
|
|
TraceDir = emqx_trace:trace_dir(),
|
|
TracePath = filename:join([TraceDir, TraceFile]),
|
|
read_file(TracePath, Position, Limit)
|
|
end.
|
|
|
|
read_file(Path, Offset, Bytes) ->
|
|
case file:open(Path, [read, raw, binary]) of
|
|
{ok, IoDevice} ->
|
|
try
|
|
_ = case Offset of
|
|
0 -> ok;
|
|
_ -> file:position(IoDevice, {bof, Offset})
|
|
end,
|
|
case file:read(IoDevice, Bytes) of
|
|
{ok, Bin} -> {ok, Bin};
|
|
{error, Reason} -> {error, Reason};
|
|
eof ->
|
|
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
|
{eof, Size}
|
|
end
|
|
after
|
|
file:close(IoDevice)
|
|
end;
|
|
{error, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
to_node(Node) ->
|
|
try {ok, binary_to_existing_atom(Node)}
|
|
catch _:_ ->
|
|
{error, not_found}
|
|
end.
|
|
|
|
collect_file_size(Nodes, FileName, AllFiles) ->
|
|
lists:foldl(fun(Node, Acc) ->
|
|
Size = maps:get({Node, FileName}, AllFiles, 0),
|
|
Acc#{Node => Size}
|
|
end, #{}, Nodes).
|
|
|
|
status(false, _Start, _End, _Now) -> <<"stopped">>;
|
|
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
|
|
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
|
|
status(true, _Start, _End, _Now) -> <<"running">>.
|