Merge pull request #6672 from zhongwencool/return-ok-when-trace-not-found

fix: delete error log when file enoent.
This commit is contained in:
zhongwencool 2022-01-07 15:36:03 +08:00 committed by GitHub
commit 6ea51692c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 252 deletions

View File

@ -19,6 +19,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-export([mnesia/1]). -export([mnesia/1]).
@ -261,6 +262,7 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
Traces = get_enable_trace(), Traces = get_enable_trace(),
NextTRef = update_trace(Traces), NextTRef = update_trace(Traces),
update_trace_handler(), update_trace_handler(),
?tp(update_trace_done, #{}),
{noreply, State#{timer => NextTRef}}; {noreply, State#{timer => NextTRef}};
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->

View File

@ -1,228 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace_api).
-include_lib("emqx/include/logger.hrl").
-include_lib("kernel/include/file.hrl").
%% API
-export([ list_trace/2
, create_trace/2
, update_trace/2
, delete_trace/2
, clear_traces/2
, download_zip_log/2
, stream_log_file/2
]).
-export([ read_trace_file/3
, get_trace_size/0
]).
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}).
list_trace(_, _Params) ->
case emqx_trace:list() of
[] -> {ok, []};
List0 ->
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0),
Nodes = mria_mnesia:running_nodes(),
TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
Now = erlang:system_time(second),
Traces =
lists:map(fun(Trace = #{name := Name, start_at := Start,
end_at := End, enable := Enable, type := Type, filter := Filter}) ->
FileName = emqx_trace:filename(Name, Start),
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
Trace0 = maps:without([enable, filter], Trace),
Trace0#{ log_size => LogSize
, Type => Filter
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
, status => status(Enable, Start, End, Now)
}
end, emqx_trace:format(List)),
{ok, Traces}
end.
create_trace(_, Param) ->
case emqx_trace:create(Param) of
ok -> ok;
{error, {already_existed, Name}} ->
{error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])};
{error, {duplicate_condition, Name}} ->
{error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])};
{error, Reason} ->
{error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}
end.
delete_trace(#{name := Name}, _Param) ->
case emqx_trace:delete(Name) of
ok -> ok;
{error, not_found} -> ?NOT_FOUND(Name)
end.
clear_traces(_, _) ->
emqx_trace:clear().
update_trace(#{name := Name, operation := Operation}, _Param) ->
Enable = case Operation of disable -> false; enable -> true end,
case emqx_trace:update(Name, Enable) of
ok -> {ok, #{enable => Enable, name => Name}};
{error, not_found} -> ?NOT_FOUND(Name)
end.
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
%% cowboy_compress_h will auto encode gzip format.
download_zip_log(#{name := Name}, _Param) ->
case emqx_trace:get_trace_filename(Name) of
{ok, TraceLog} ->
TraceFiles = collect_trace_file(TraceLog),
ZipDir = emqx_trace:zip_dir(),
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips),
{ok, ZipFile};
{error, Reason} ->
{error, Reason}
end.
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
lists:foldl(fun(Res, Acc) ->
case Res of
{ok, Node, Bin} ->
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
case file:write_file(ZipName, Bin) of
ok -> [Node ++ "-" ++ TraceLog | Acc];
Error ->
?SLOG(error, #{
msg => "write_file_failed",
error => Error,
zip_name => ZipName,
byte_size => byte_size(Bin)}),
Acc
end;
{error, Node, Reason} ->
?SLOG(error, #{
msg => "download_trace_log_failed",
node => Node,
trace_log => TraceLog,
reason => Reason
}),
Acc
end
end, [], TraceFiles).
collect_trace_file(TraceLog) ->
cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
cluster_call(Mod, Fun, Args, Timeout) ->
Nodes = mria_mnesia:running_nodes(),
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
BadNodes =/= [] andalso
?SLOG(error, #{
msg => "rpc_call_failed",
bad_nodes => BadNodes,
mfa => {Mod, Fun, Args }
}),
GoodRes.
stream_log_file(#{name := Name}, Params) ->
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
case to_node(Node0) of
{ok, Node} ->
Position = binary_to_integer(Position0),
Bytes = binary_to_integer(Bytes0),
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
{ok, Bin} ->
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => Bin}};
{eof, Size} ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => <<"">>}};
{error, Reason} ->
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
{error, Reason};
{badrpc, nodedown} ->
{error, "BadRpc node down"}
end;
{error, Reason} -> {error, Reason}
end.
get_trace_size() ->
TraceDir = emqx_trace:trace_dir(),
Node = node(),
case file:list_dir(TraceDir) of
{ok, AllFiles} ->
lists:foldl(fun(File, Acc) ->
FullFileName = filename:join(TraceDir, File),
Acc#{{Node, File} => filelib:file_size(FullFileName)}
end, #{}, lists:delete("zip", AllFiles));
_ -> #{}
end.
%% this is an rpc call for stream_log_file/2
read_trace_file(Name, Position, Limit) ->
TraceDir = emqx_trace:trace_dir(),
{ok, AllFiles} = file:list_dir(TraceDir),
TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_",
Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end,
case lists:filter(Filter, AllFiles) of
[TraceFile] ->
TracePath = filename:join([TraceDir, TraceFile]),
read_file(TracePath, Position, Limit);
[] -> {error, not_found}
end.
read_file(Path, Offset, Bytes) ->
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
try
_ = case Offset of
0 -> ok;
_ -> file:position(IoDevice, {bof, Offset})
end,
case file:read(IoDevice, Bytes) of
{ok, Bin} -> {ok, Bin};
{error, Reason} -> {error, Reason};
eof ->
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
{eof, Size}
end
after
file:close(IoDevice)
end.
to_node(Node) ->
try {ok, binary_to_existing_atom(Node)}
catch _:_ ->
{error, "node not found"}
end.
collect_file_size(Nodes, FileName, AllFiles) ->
lists:foldl(fun(Node, Acc) ->
Size = maps:get({Node, FileName}, AllFiles, 0),
Acc#{Node => Size}
end, #{}, Nodes).
%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>;
status(false, _Start, _End, _Now) -> <<"stopped">>;
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
status(true, _Start, _End, _Now) -> <<"running">>.

View File

@ -298,23 +298,6 @@ t_trace_file(_Config) ->
ok = file:delete(File), ok = file:delete(File),
ok. ok.
t_download_log(_Config) ->
ClientId = <<"client-test-download">>,
Now = erlang:system_time(second),
Start = to_rfc3339(Now),
Name = <<"test_client_id">>,
ok = emqx_trace:create([{<<"name">>, Name},
{<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
ct:sleep(50),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
?assert(filelib:file_size(ZipFile) > 0),
ok = emqtt:disconnect(Client),
ok.
t_find_closed_time(_Config) -> t_find_closed_time(_Config) ->
DefaultMs = 60 * 15000, DefaultMs = 60 * 15000,
Now = erlang:system_time(second), Now = erlang:system_time(second),

View File

@ -420,8 +420,8 @@ t_mqtt_conn_update2(_) ->
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
wait_for_resource_ready(BridgeIDEgress, 5), wait_for_resource_ready(BridgeIDEgress, 5),
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := BridgeIDEgress ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),

View File

@ -338,7 +338,8 @@ collect_trace_file(TraceLog) ->
cluster_call(Mod, Fun, Args, Timeout) -> cluster_call(Mod, Fun, Args, Timeout) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]), BadNodes =/= [] andalso
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}),
GoodRes. GoodRes.
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
@ -354,8 +355,12 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
{eof, Size} -> {eof, Size} ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{200, #{meta => Meta, items => <<"">>}}; {200, #{meta => Meta, items => <<"">>}};
{error, enoent} -> %% the waiting trace should return "" not error.
Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
{200, #{meta => Meta, items => <<"">>}};
{error, Reason} -> {error, Reason} ->
logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]), ?SLOG(error, #{msg => "read_file_failed",
node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}),
{400, #{code => 'READ_FILE_ERROR', message => Reason}}; {400, #{code => 'READ_FILE_ERROR', message => Reason}};
{badrpc, nodedown} -> {badrpc, nodedown} ->
{400, #{code => 'RPC_ERROR', message => "BadRpc node down"}} {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}

View File

@ -442,7 +442,7 @@ trace_off(Type, Filter) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Trace Cluster Command %% @doc Trace Cluster Command
traces(["list"]) -> traces(["list"]) ->
{ok, List} = emqx_trace_api:list_trace(get, []), {200, List} = emqx_mgmt_api_trace:trace(get, []),
case List of case List of
[] -> [] ->
emqx_ctl:print("Cluster Trace is empty~n", []); emqx_ctl:print("Cluster Trace is empty~n", []);

View File

@ -22,6 +22,9 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("stdlib/include/zip.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(HOST, "http://127.0.0.1:18083/"). -define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
@ -112,6 +115,40 @@ t_http_test(_Config) ->
unload(), unload(),
ok. ok.
t_download_log(_Config) ->
ClientId = <<"client-test-download">>,
Now = erlang:system_time(second),
Start = to_rfc3339(Now),
Name = <<"test_client_id">>,
load(),
create_trace(Name, ClientId, Start),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
Header = auth_header_(),
{ok, Binary} = request_api(get, api_path("trace/test_client_id/download"), Header),
{ok, [_Comment,
#zip_file{name = ZipName, info = #file_info{size = Size, type = regular, access = read_write}}]}
= zip:table(Binary),
?assert(Size > 0),
ZipNamePrefix = lists:flatten(io_lib:format("~s-trace_~s", [node(), Name])),
?assertNotEqual(nomatch, re:run(ZipName, [ZipNamePrefix])),
ok = emqtt:disconnect(Client),
ok.
create_trace(Name, ClientId, Start) ->
?check_trace(
#{timetrap => 900},
begin
ok = emqx_trace:create([{<<"name">>, Name},
{<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
?block_until(#{?snk_kind := update_trace_done})
end,
fun(Trace) ->
?assertMatch([#{}], ?of_kind(update_trace_done, Trace))
end).
t_stream_log(_Config) -> t_stream_log(_Config) ->
application:set_env(emqx, allow_anonymous, true), application:set_env(emqx, allow_anonymous, true),
emqx_trace:clear(), emqx_trace:clear(),
@ -120,9 +157,7 @@ t_stream_log(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
Name = <<"test_stream_log">>, Name = <<"test_stream_log">>,
Start = to_rfc3339(Now - 10), Start = to_rfc3339(Now - 10),
ok = emqx_trace:create(#{<<"name">> => Name, create_trace(Name, ClientId, Start),
<<"type">> => clientid, <<"clientid">> => ClientId, <<"start_at">> => Start}),
ct:sleep(200),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)], [begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)],