From 07ba4ad05ef2f979cbf2de94366f049f86cb6d53 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 12:48:40 +0800 Subject: [PATCH 1/3] fix: delete error log when file enoent. delete emqx_trace_api, replace LOG by SLOG --- apps/emqx/src/emqx_trace/emqx_trace_api.erl | 228 ------------------ apps/emqx/test/emqx_trace_SUITE.erl | 17 -- .../src/emqx_mgmt_api_trace.erl | 9 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 2 +- .../test/emqx_mgmt_trace_api_SUITE.erl | 26 ++ 5 files changed, 34 insertions(+), 248 deletions(-) delete mode 100644 apps/emqx/src/emqx_trace/emqx_trace_api.erl diff --git a/apps/emqx/src/emqx_trace/emqx_trace_api.erl b/apps/emqx/src/emqx_trace/emqx_trace_api.erl deleted file mode 100644 index bc276923d..000000000 --- a/apps/emqx/src/emqx_trace/emqx_trace_api.erl +++ /dev/null @@ -1,228 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_trace_api). --include_lib("emqx/include/logger.hrl"). --include_lib("kernel/include/file.hrl"). - -%% API --export([ list_trace/2 - , create_trace/2 - , update_trace/2 - , delete_trace/2 - , clear_traces/2 - , download_zip_log/2 - , stream_log_file/2 -]). --export([ read_trace_file/3 - , get_trace_size/0 - ]). - --define(TO_BIN(_B_), iolist_to_binary(_B_)). --define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}). - -list_trace(_, _Params) -> - case emqx_trace:list() of - [] -> {ok, []}; - List0 -> - List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0), - Nodes = mria_mnesia:running_nodes(), - TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000), - AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), - Now = erlang:system_time(second), - Traces = - lists:map(fun(Trace = #{name := Name, start_at := Start, - end_at := End, enable := Enable, type := Type, filter := Filter}) -> - FileName = emqx_trace:filename(Name, Start), - LogSize = collect_file_size(Nodes, FileName, AllFileSize), - Trace0 = maps:without([enable, filter], Trace), - Trace0#{ log_size => LogSize - , Type => Filter - , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) - , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) - , status => status(Enable, Start, End, Now) - } - end, emqx_trace:format(List)), - {ok, Traces} - end. - -create_trace(_, Param) -> - case emqx_trace:create(Param) of - ok -> ok; - {error, {already_existed, Name}} -> - {error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])}; - {error, {duplicate_condition, Name}} -> - {error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])}; - {error, Reason} -> - {error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)} - end. - -delete_trace(#{name := Name}, _Param) -> - case emqx_trace:delete(Name) of - ok -> ok; - {error, not_found} -> ?NOT_FOUND(Name) - end. - -clear_traces(_, _) -> - emqx_trace:clear(). - -update_trace(#{name := Name, operation := Operation}, _Param) -> - Enable = case Operation of disable -> false; enable -> true end, - case emqx_trace:update(Name, Enable) of - ok -> {ok, #{enable => Enable, name => Name}}; - {error, not_found} -> ?NOT_FOUND(Name) - end. - -%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. -%% cowboy_compress_h will auto encode gzip format. -download_zip_log(#{name := Name}, _Param) -> - case emqx_trace:get_trace_filename(Name) of - {ok, TraceLog} -> - TraceFiles = collect_trace_file(TraceLog), - ZipDir = emqx_trace:zip_dir(), - Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), - ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", - {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), - emqx_trace:delete_files_after_send(ZipFileName, Zips), - {ok, ZipFile}; - {error, Reason} -> - {error, Reason} - end. - -group_trace_file(ZipDir, TraceLog, TraceFiles) -> - lists:foldl(fun(Res, Acc) -> - case Res of - {ok, Node, Bin} -> - ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, - case file:write_file(ZipName, Bin) of - ok -> [Node ++ "-" ++ TraceLog | Acc]; - Error -> - ?SLOG(error, #{ - msg => "write_file_failed", - error => Error, - zip_name => ZipName, - byte_size => byte_size(Bin)}), - Acc - end; - {error, Node, Reason} -> - ?SLOG(error, #{ - msg => "download_trace_log_failed", - node => Node, - trace_log => TraceLog, - reason => Reason - }), - Acc - end - end, [], TraceFiles). - -collect_trace_file(TraceLog) -> - cluster_call(emqx_trace, trace_file, [TraceLog], 60000). - -cluster_call(Mod, Fun, Args, Timeout) -> - Nodes = mria_mnesia:running_nodes(), - {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), - BadNodes =/= [] andalso - ?SLOG(error, #{ - msg => "rpc_call_failed", - bad_nodes => BadNodes, - mfa => {Mod, Fun, Args } - }), - GoodRes. - -stream_log_file(#{name := Name}, Params) -> - Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), - Position0 = proplists:get_value(<<"position">>, Params, <<"0">>), - Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>), - case to_node(Node0) of - {ok, Node} -> - Position = binary_to_integer(Position0), - Bytes = binary_to_integer(Bytes0), - case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of - {ok, Bin} -> - Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, - {ok, #{meta => Meta, items => Bin}}; - {eof, Size} -> - Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, - {ok, #{meta => Meta, items => <<"">>}}; - {error, Reason} -> - logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), - {error, Reason}; - {badrpc, nodedown} -> - {error, "BadRpc node down"} - end; - {error, Reason} -> {error, Reason} - end. - -get_trace_size() -> - TraceDir = emqx_trace:trace_dir(), - Node = node(), - case file:list_dir(TraceDir) of - {ok, AllFiles} -> - lists:foldl(fun(File, Acc) -> - FullFileName = filename:join(TraceDir, File), - Acc#{{Node, File} => filelib:file_size(FullFileName)} - end, #{}, lists:delete("zip", AllFiles)); - _ -> #{} - end. - -%% this is an rpc call for stream_log_file/2 -read_trace_file(Name, Position, Limit) -> - TraceDir = emqx_trace:trace_dir(), - {ok, AllFiles} = file:list_dir(TraceDir), - TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_", - Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end, - case lists:filter(Filter, AllFiles) of - [TraceFile] -> - TracePath = filename:join([TraceDir, TraceFile]), - read_file(TracePath, Position, Limit); - [] -> {error, not_found} - end. - -read_file(Path, Offset, Bytes) -> - {ok, IoDevice} = file:open(Path, [read, raw, binary]), - try - _ = case Offset of - 0 -> ok; - _ -> file:position(IoDevice, {bof, Offset}) - end, - case file:read(IoDevice, Bytes) of - {ok, Bin} -> {ok, Bin}; - {error, Reason} -> {error, Reason}; - eof -> - {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), - {eof, Size} - end - after - file:close(IoDevice) - end. - -to_node(Node) -> - try {ok, binary_to_existing_atom(Node)} - catch _:_ -> - {error, "node not found"} - end. - -collect_file_size(Nodes, FileName, AllFiles) -> - lists:foldl(fun(Node, Acc) -> - Size = maps:get({Node, FileName}, AllFiles, 0), - Acc#{Node => Size} - end, #{}, Nodes). - -%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>; -status(false, _Start, _End, _Now) -> <<"stopped">>; -status(true, Start, _End, Now) when Now < Start -> <<"waiting">>; -status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; -status(true, _Start, _End, _Now) -> <<"running">>. diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 8a0ed1846..a1cad9d27 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -298,23 +298,6 @@ t_trace_file(_Config) -> ok = file:delete(File), ok. -t_download_log(_Config) -> - ClientId = <<"client-test-download">>, - Now = erlang:system_time(second), - Start = to_rfc3339(Now), - Name = <<"test_client_id">>, - ok = emqx_trace:create([{<<"name">>, Name}, - {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(50), - {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(Client), - [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], - ok = emqx_trace_handler_SUITE:filesync(Name, clientid), - {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), - ?assert(filelib:file_size(ZipFile) > 0), - ok = emqtt:disconnect(Client), - ok. - t_find_closed_time(_Config) -> DefaultMs = 60 * 15000, Now = erlang:system_time(second), diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 9fa58d83a..221cbbeec 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -338,7 +338,8 @@ collect_trace_file(TraceLog) -> cluster_call(Mod, Fun, Args, Timeout) -> Nodes = mria_mnesia:running_nodes(), {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), - BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]), + BadNodes =/= [] andalso + ?SLOG(error, #{msg => "rpc call failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}), GoodRes. stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> @@ -354,8 +355,12 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> {eof, Size} -> Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, {200, #{meta => Meta, items => <<"">>}}; + {error, enoent} -> %% the waiting trace should return "" not error. + Meta = #{<<"position">> => Position, <<"bytes">> => Bytes}, + {200, #{meta => Meta, items => <<"">>}}; {error, Reason} -> - logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]), + ?SLOG(error, #{msg => "read_file_failed", + node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}), {400, #{code => 'READ_FILE_ERROR', message => Reason}}; {badrpc, nodedown} -> {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}} diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 45869958d..cd5474261 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -442,7 +442,7 @@ trace_off(Type, Filter) -> %%-------------------------------------------------------------------- %% @doc Trace Cluster Command traces(["list"]) -> - {ok, List} = emqx_trace_api:list_trace(get, []), + {200, List} = emqx_mgmt_api_trace:trace(get, []), case List of [] -> emqx_ctl:print("Cluster Trace is empty~n", []); diff --git a/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl index 0a2a618fb..c3d40117e 100644 --- a/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl @@ -22,6 +22,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("stdlib/include/zip.hrl"). -define(HOST, "http://127.0.0.1:18083/"). -define(API_VERSION, "v5"). @@ -112,6 +114,30 @@ t_http_test(_Config) -> unload(), ok. +t_download_log(_Config) -> + ClientId = <<"client-test-download">>, + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + Name = <<"test_client_id">>, + load(), + ok = emqx_trace:create([{<<"name">>, Name}, + {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ct:sleep(50), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + Header = auth_header_(), + {ok, Binary} = request_api(get, api_path("trace/test_client_id/download"), Header), + {ok, [_Comment, + #zip_file{name = ZipName, info = #file_info{size = Size, type = regular, access = read_write}}]} + = zip:table(Binary), + ?assert(Size > 0), + ZipNamePrefix = lists:flatten(io_lib:format("~s-trace_~s", [node(), Name])), + ?assertNotEqual(nomatch, re:run(ZipName, [ZipNamePrefix])), + ok = emqtt:disconnect(Client), + ok. + t_stream_log(_Config) -> application:set_env(emqx, allow_anonymous, true), emqx_trace:clear(), From 977b1bb7ec051f64e534c7ce37427b87ae135a4c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jan 2022 11:08:40 +0800 Subject: [PATCH 2/3] chore(test): add snabbkaffe for create_trace test --- apps/emqx/src/emqx_trace/emqx_trace.erl | 2 ++ .../test/emqx_mgmt_trace_api_SUITE.erl | 21 +++++++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index a8fed05a6..51eea43e8 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -boot_mnesia({mnesia, [boot]}). -export([mnesia/1]). @@ -261,6 +262,7 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> Traces = get_enable_trace(), NextTRef = update_trace(Traces), update_trace_handler(), + ?tp(update_trace_done, #{}), {noreply, State#{timer => NextTRef}}; handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl index c3d40117e..c2b555891 100644 --- a/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("kernel/include/file.hrl"). -include_lib("stdlib/include/zip.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(HOST, "http://127.0.0.1:18083/"). -define(API_VERSION, "v5"). @@ -120,9 +121,7 @@ t_download_log(_Config) -> Start = to_rfc3339(Now), Name = <<"test_client_id">>, load(), - ok = emqx_trace:create([{<<"name">>, Name}, - {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(50), + create_trace(Name, ClientId, Start), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], @@ -138,6 +137,18 @@ t_download_log(_Config) -> ok = emqtt:disconnect(Client), ok. +create_trace(Name, ClientId, Start) -> + ?check_trace( + #{timetrap => 900}, + begin + ok = emqx_trace:create([{<<"name">>, Name}, + {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ?block_until(#{?snk_kind := update_trace_done}) + end, + fun(Trace) -> + ?assertMatch([#{}], ?of_kind(update_trace_done, Trace)) + end). + t_stream_log(_Config) -> application:set_env(emqx, allow_anonymous, true), emqx_trace:clear(), @@ -146,9 +157,7 @@ t_stream_log(_Config) -> Now = erlang:system_time(second), Name = <<"test_stream_log">>, Start = to_rfc3339(Now - 10), - ok = emqx_trace:create(#{<<"name">> => Name, - <<"type">> => clientid, <<"clientid">> => ClientId, <<"start_at">> => Start}), - ct:sleep(200), + create_trace(Name, ClientId, Start), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)], From 0ec111d4e04f4394f5e926791780290b88e19b2e Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jan 2022 14:40:48 +0800 Subject: [PATCH 3/3] fix(test): flaky emqx_connector_api_SUITE test case. --- apps/emqx_connector/test/emqx_connector_api_SUITE.erl | 2 +- apps/emqx_management/src/emqx_mgmt_api_trace.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 2819cbbc0..8ca132b16 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -420,8 +420,8 @@ t_mqtt_conn_update2(_) -> %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), wait_for_resource_ready(BridgeIDEgress, 5), + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 221cbbeec..324f0f227 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -339,7 +339,7 @@ cluster_call(Mod, Fun, Args, Timeout) -> Nodes = mria_mnesia:running_nodes(), {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), BadNodes =/= [] andalso - ?SLOG(error, #{msg => "rpc call failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}), + ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}), GoodRes. stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->