emqx/apps/emqx_management/src/emqx_mgmt_api_trace.erl

760 lines
24 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_trace).
-behaviour(minirest_api).
-include_lib("kernel/include/file.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-export([
api_spec/0,
fields/1,
paths/0,
schema/1,
namespace/0
]).
-export([
trace/2,
delete_trace/2,
update_trace/2,
download_trace_log/2,
stream_log_file/2,
log_file_detail/2
]).
-export([validate_name/1]).
%% for rpc
-export([
read_trace_file/3,
get_trace_size/0
]).
-define(MAX_SINT32, 2147483647).
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
-define(NOT_FOUND_WITH_MSG(N), ?NOT_FOUND(?TO_BIN([N, " NOT FOUND"]))).
-define(TAGS, [<<"Trace">>]).
namespace() -> "trace".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() ->
[
"/trace",
"/trace/:name/stop",
"/trace/:name/download",
"/trace/:name/log",
"/trace/:name/log_detail",
"/trace/:name"
].
schema("/trace") ->
#{
'operationId' => trace,
get => #{
description => ?DESC(list_all),
tags => ?TAGS,
responses => #{
200 => hoconsc:array(hoconsc:ref(trace))
}
},
post => #{
description => ?DESC(create_new),
tags => ?TAGS,
'requestBody' => delete([status, log_size], fields(trace)),
responses => #{
200 => hoconsc:ref(trace),
400 => emqx_dashboard_swagger:error_codes(
[
'INVALID_PARAMS'
],
<<"invalid trace params">>
),
409 => emqx_dashboard_swagger:error_codes(
[
'ALREADY_EXISTS',
'DUPLICATE_CONDITION',
'BAD_TYPE'
],
<<"trace already exists">>
)
}
},
delete => #{
description => ?DESC(clear_all),
tags => ?TAGS,
responses => #{
204 => <<"No Content">>
}
}
};
schema("/trace/:name") ->
#{
'operationId' => delete_trace,
delete => #{
description => ?DESC(delete_trace),
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
204 => <<"Delete successfully">>,
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
}
}
};
schema("/trace/:name/stop") ->
#{
'operationId' => update_trace,
put => #{
description => ?DESC(stop_trace),
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
200 => hoconsc:ref(trace),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
}
}
};
schema("/trace/:name/download") ->
#{
'operationId' => download_trace_log,
get => #{
description => ?DESC(download_log_by_name),
tags => ?TAGS,
parameters => [hoconsc:ref(name), hoconsc:ref(node)],
responses => #{
200 =>
#{
description => "A trace zip file",
content => #{
'application/octet-stream' =>
#{schema => #{type => "string", format => "binary"}}
}
},
404 => emqx_dashboard_swagger:error_codes(
['NOT_FOUND', 'NODE_ERROR'], <<"Trace Name or Node Not Found">>
)
}
}
};
schema("/trace/:name/log_detail") ->
#{
'operationId' => log_file_detail,
get => #{
description => ?DESC(get_trace_file_metadata),
tags => ?TAGS,
parameters => [hoconsc:ref(name)],
responses => #{
200 => hoconsc:array(hoconsc:ref(log_file_detail)),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
}
}
};
schema("/trace/:name/log") ->
#{
'operationId' => stream_log_file,
get => #{
description => ?DESC(view_trace_log),
tags => ?TAGS,
parameters => [
hoconsc:ref(name),
hoconsc:ref(bytes),
hoconsc:ref(position),
hoconsc:ref(node)
],
responses => #{
200 =>
[
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})},
{meta, fields(bytes) ++ fields(position)}
],
400 => emqx_dashboard_swagger:error_codes(
['BAD_REQUEST'], <<"Bad input parameter">>
),
404 => emqx_dashboard_swagger:error_codes(
['NOT_FOUND', 'NODE_ERROR'], <<"Trace Name or Node Not Found">>
),
503 => emqx_dashboard_swagger:error_codes(
['SERVICE_UNAVAILABLE'], <<"Requested chunk size too big">>
)
}
}
}.
fields(log_file_detail) ->
fields(node) ++
[
{size, hoconsc:mk(integer(), #{description => ?DESC(file_size)})},
{mtime, hoconsc:mk(integer(), #{description => ?DESC(file_mtime)})}
];
fields(trace) ->
[
{name,
hoconsc:mk(
binary(),
#{
description => ?DESC(trace_name),
validator => fun ?MODULE:validate_name/1,
required => true,
example => <<"EMQX-TRACE-1">>
}
)},
{type,
hoconsc:mk(
hoconsc:enum([clientid, topic, ip_address, ruleid]),
#{
description => ?DESC(filter_type),
required => true,
example => <<"clientid">>
}
)},
{topic,
hoconsc:mk(
binary(),
#{
description => ?DESC(support_wildcard),
required => false,
example => <<"/dev/#">>
}
)},
{clientid,
hoconsc:mk(
binary(),
#{
description => ?DESC(mqtt_clientid),
required => false,
example => <<"dev-001">>
}
)},
%% TODO add ip_address type in emqx_schema.erl
{ip_address,
hoconsc:mk(
binary(),
#{
description => ?DESC(client_ip_addess),
required => false,
example => <<"127.0.0.1">>
}
)},
{ruleid,
hoconsc:mk(
binary(),
#{
description => ?DESC(ruleid_field),
required => false,
example => <<"my_rule">>
}
)},
{status,
hoconsc:mk(
hoconsc:enum([running, stopped, waiting]),
#{
description => ?DESC(trace_status),
required => false,
example => running
}
)},
{payload_encode,
hoconsc:mk(hoconsc:enum([hex, text, hidden]), #{
desc =>
""
"Determine the format of the payload format in the trace file.<br/>\n"
"`text`: Text-based protocol or plain text protocol.\n"
" It is recommended when payload is JSON encoded.<br/>\n"
"`hex`: Binary hexadecimal encode."
"It is recommended when payload is a custom binary protocol.<br/>\n"
"`hidden`: payload is obfuscated as `******`"
"",
default => text
})},
{start_at,
hoconsc:mk(
emqx_utils_calendar:epoch_second(),
#{
description => ?DESC(time_format),
required => false,
example => <<"2021-11-04T18:17:38+08:00">>
}
)},
{end_at,
hoconsc:mk(
emqx_utils_calendar:epoch_second(),
#{
description => ?DESC(time_format),
required => false,
example => <<"2021-11-05T18:17:38+08:00">>
}
)},
{log_size,
hoconsc:mk(
hoconsc:array(map()),
#{
description => ?DESC(trace_log_size),
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
required => false
}
)},
{formatter,
hoconsc:mk(
hoconsc:union([text, json]),
#{
description => ?DESC(trace_log_formatter),
example => text,
required => false
}
)}
];
fields(name) ->
[
{name,
hoconsc:mk(
binary(),
#{
desc => <<"[a-zA-Z0-9-_]">>,
example => <<"EMQX-TRACE-1">>,
in => path,
validator => fun ?MODULE:validate_name/1
}
)}
];
fields(node) ->
[
{node,
hoconsc:mk(
binary(),
#{
description => ?DESC(node_name),
in => query,
required => false,
example => "emqx@127.0.0.1"
}
)}
];
fields(bytes) ->
[
{bytes,
hoconsc:mk(
%% This seems to be the minimum max value we may encounter
%% across different OS
range(0, ?MAX_SINT32),
#{
description => ?DESC(max_response_bytes),
in => query,
required => false,
default => 1000
}
)}
];
fields(position) ->
[
{position,
hoconsc:mk(
integer(),
#{
description => ?DESC(current_trace_offset),
in => query,
required => false,
default => 0
}
)}
].
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
validate_name(Name) ->
NameLen = byte_size(Name),
case NameLen > 0 andalso NameLen =< 256 of
true ->
case re:run(Name, ?NAME_RE) of
nomatch -> {error, "Name should be " ?NAME_RE};
_ -> ok
end;
false ->
{error, "Name Length must =< 256"}
end.
delete(Keys, Fields) ->
lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
trace(get, _Params) ->
case emqx_trace:list() of
[] ->
{200, []};
List0 ->
List = lists:sort(
fun(#{start_at := A}, #{start_at := B}) -> A > B end,
emqx_trace:format(List0)
),
Nodes = emqx:running_nodes(),
TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v2:get_trace_size(Nodes)),
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
Now = erlang:system_time(second),
Traces =
lists:map(
fun(
Trace = #{
name := Name,
start_at := Start,
end_at := End,
enable := Enable,
type := Type,
filter := Filter
}
) ->
FileName = emqx_trace:filename(Name, Start),
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
Trace0 = maps:without([enable, filter], Trace),
Trace0#{
log_size => LogSize,
Type => iolist_to_binary(Filter),
start_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
end_at => emqx_utils_calendar:epoch_to_rfc3339(End, second),
status => status(Enable, Start, End, Now)
}
end,
List
),
{200, Traces}
end;
trace(post, #{body := Param}) ->
case emqx_trace:create(Param) of
{ok, Trace0} ->
{200, format_trace(Trace0)};
{error, {already_existed, Name}} ->
{409, #{
code => 'ALREADY_EXISTS',
message => ?TO_BIN([Name, " Already Exists"])
}};
{error, {duplicate_condition, Name}} ->
{409, #{
code => 'DUPLICATE_CONDITION',
message => ?TO_BIN([Name, " Duplication Condition"])
}};
{error, {bad_type, _}} ->
{409, #{
code => 'BAD_TYPE',
message => <<"Rolling upgrade in progress, create failed">>
}};
{error, Reason} ->
{400, #{
code => 'INVALID_PARAMS',
message => ?TO_BIN(Reason)
}}
end;
trace(delete, _Param) ->
ok = emqx_trace:clear(),
{204}.
format_trace(Trace0) ->
[
#{
start_at := Start,
end_at := End,
enable := Enable,
type := Type,
filter := Filter
} = Trace1
] = emqx_trace:format([Trace0]),
Now = erlang:system_time(second),
LogSize = lists:foldl(
fun(Node, Acc) -> Acc#{Node => 0} end,
#{},
emqx:running_nodes()
),
Trace2 = maps:without([enable, filter], Trace1),
Trace2#{
log_size => LogSize,
Type => iolist_to_binary(Filter),
start_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
end_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
status => status(Enable, Start, End, Now)
}.
delete_trace(delete, #{bindings := #{name := Name}}) ->
case emqx_trace:delete(Name) of
ok -> {204};
{error, not_found} -> ?NOT_FOUND_WITH_MSG(Name)
end.
update_trace(put, #{bindings := #{name := Name}}) ->
case emqx_trace:update(Name, false) of
ok -> {200, #{enable => false, name => Name}};
{error, not_found} -> ?NOT_FOUND_WITH_MSG(Name)
end.
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
%% cowboy_compress_h will auto encode gzip format.
download_trace_log(get, #{bindings := #{name := Name}, query_string := Query}) ->
case emqx_trace:get_trace_filename(Name) of
{ok, TraceLog} ->
case parse_node(Query, undefined) of
{ok, Node} ->
TraceFiles = collect_trace_file(Node, TraceLog),
maybe_download_trace_log(Name, TraceLog, TraceFiles);
{error, not_found} ->
?NOT_FOUND_WITH_MSG(<<"Node">>)
end;
{error, not_found} ->
?NOT_FOUND_WITH_MSG(Name)
end.
maybe_download_trace_log(Name, TraceLog, TraceFiles) ->
case group_trace_files(TraceLog, TraceFiles) of
#{nonempty := Files} ->
do_download_trace_log(Name, TraceLog, Files);
#{error := Reasons} ->
?INTERNAL_ERROR(Reasons);
#{empty := _} ->
?NOT_FOUND(<<"Trace is empty">>)
end.
do_download_trace_log(Name, TraceLog, TraceFiles) ->
%% We generate a session ID so that we name files
%% with unique names. Then we won't cause
%% overwrites for concurrent requests.
SessionId = emqx_utils:gen_id(),
ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]),
ok = file:make_dir(ZipDir),
%% Write files to ZipDir and create an in-memory zip file
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
ZipName = binary_to_list(Name) ++ ".zip",
Binary =
try
{ok, {ZipName, Bin}} = zip:zip(ZipName, Zips, [memory, {cwd, ZipDir}]),
Bin
after
%% emqx_trace:delete_files_after_send(ZipFileName, Zips),
%% TODO use file replace file_binary.(delete file after send is not ready now).
ok = file:del_dir_r(ZipDir)
end,
?tp(trace_api_download_trace_log, #{
files => Zips,
name => Name,
session_id => SessionId,
zip_dir => ZipDir,
zip_name => ZipName
}),
Headers = #{
<<"content-type">> => <<"application/x-zip">>,
<<"content-disposition">> => iolist_to_binary(
"attachment; filename=" ++ ZipName
)
},
{200, Headers, {file_binary, ZipName, Binary}}.
group_trace_files(TraceLog, TraceFiles) ->
maps:groups_from_list(
fun
({ok, _Node, <<>>}) ->
empty;
({ok, _Node, _Bin}) ->
nonempty;
({error, _Node, enoent}) ->
empty;
({error, Node, Reason}) ->
?SLOG(error, #{
msg => "download_trace_log_error",
node => Node,
log => TraceLog,
reason => Reason
}),
error
end,
TraceFiles
).
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
lists:foldl(
fun({ok, Node, Bin}, Acc) ->
FileName = Node ++ "-" ++ TraceLog,
ZipName = filename:join([ZipDir, FileName]),
case file:write_file(ZipName, Bin) of
ok -> [FileName | Acc];
_ -> Acc
end
end,
[],
TraceFiles
).
collect_trace_file(undefined, TraceLog) ->
Nodes = emqx:running_nodes(),
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file(Nodes, TraceLog));
collect_trace_file(Node, TraceLog) ->
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file([Node], TraceLog)).
collect_trace_file_detail(TraceLog) ->
Nodes = emqx:running_nodes(),
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file_detail(Nodes, TraceLog)).
wrap_rpc({GoodRes, BadNodes}) ->
BadNodes =/= [] andalso
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
GoodRes.
log_file_detail(get, #{bindings := #{name := Name}}) ->
case emqx_trace:get_trace_filename(Name) of
{ok, TraceLog} ->
TraceFiles = collect_trace_file_detail(TraceLog),
{200, group_trace_file_detail(TraceFiles)};
{error, not_found} ->
?NOT_FOUND_WITH_MSG(Name)
end.
group_trace_file_detail(TraceLogDetail) ->
GroupFun =
fun
({ok, Info}, Acc) ->
[Info | Acc];
({error, Error}, Acc) ->
?SLOG(error, Error#{msg => "read_trace_file_failed"}),
Acc
end,
lists:foldl(GroupFun, [], TraceLogDetail).
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
Position = maps:get(<<"position">>, Query, 0),
Bytes = maps:get(<<"bytes">>, Query, 1000),
case parse_node(Query, node()) of
{ok, Node} ->
case emqx_mgmt_trace_proto_v2:read_trace_file(Node, Name, Position, Bytes) of
{ok, Bin} ->
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{200, #{meta => Meta, items => Bin}};
{eof, Size} ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{200, #{meta => Meta, items => <<"">>}};
%% the waiting trace should return "" not error.
{error, enoent} ->
Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
{200, #{meta => Meta, items => <<"">>}};
{error, not_found} ->
?NOT_FOUND_WITH_MSG(Name);
{error, enomem} ->
?SLOG(warning, #{
code => not_enough_mem,
msg => "Requested chunk size too big",
bytes => Bytes,
name => Name
}),
?SERVICE_UNAVAILABLE(<<"Requested chunk size too big">>);
{badrpc, nodedown} ->
?NOT_FOUND_WITH_MSG(<<"Node">>)
end;
{error, not_found} ->
?NOT_FOUND_WITH_MSG(<<"Node">>)
end.
-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
get_trace_size() ->
TraceDir = emqx_trace:trace_dir(),
Node = node(),
case file:list_dir(TraceDir) of
{ok, AllFiles} ->
lists:foldl(
fun(File, Acc) ->
FullFileName = filename:join(TraceDir, File),
Acc#{{Node, File} => filelib:file_size(FullFileName)}
end,
#{},
lists:delete("zip", AllFiles)
);
_ ->
#{}
end.
%% this is an rpc call for stream_log_file/2
-spec read_trace_file(
binary(),
non_neg_integer(),
non_neg_integer()
) ->
{ok, binary()}
| {error, _}
| {eof, non_neg_integer()}.
read_trace_file(Name, Position, Limit) ->
case emqx_trace:get_trace_filename(Name) of
{error, _} = Error ->
Error;
{ok, TraceFile} ->
TraceDir = emqx_trace:trace_dir(),
TracePath = filename:join([TraceDir, TraceFile]),
read_file(TracePath, Position, Limit)
end.
read_file(Path, Offset, Bytes) ->
case file:open(Path, [read, raw, binary]) of
{ok, IoDevice} ->
try
_ =
case Offset of
0 -> ok;
_ -> file:position(IoDevice, {bof, Offset})
end,
case file:read(IoDevice, Bytes) of
{ok, Bin} ->
{ok, Bin};
{error, Reason} ->
{error, Reason};
eof ->
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
{eof, Size}
end
after
file:close(IoDevice)
end;
{error, Reason} ->
{error, Reason}
end.
parse_node(Query, Default) ->
try
case maps:find(<<"node">>, Query) of
error ->
{ok, Default};
{ok, NodeBin} ->
Node = binary_to_existing_atom(NodeBin),
true = lists:member(Node, emqx:running_nodes()),
{ok, Node}
end
catch
_:_ ->
{error, not_found}
end.
collect_file_size(Nodes, FileName, AllFiles) ->
lists:foldl(
fun(Node, Acc) ->
Size = maps:get({Node, FileName}, AllFiles, 0),
Acc#{Node => Size}
end,
#{},
Nodes
).
status(false, _Start, _End, _Now) -> <<"stopped">>;
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
status(true, _Start, _End, _Now) -> <<"running">>.