diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index d90aea9ef..cc4a905a4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -20,6 +20,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ api_spec/0, @@ -461,16 +462,31 @@ download_trace_log(get, #{bindings := #{name := Name}, query_string := Query}) - case parse_node(Query, undefined) of {ok, Node} -> TraceFiles = collect_trace_file(Node, TraceLog), - ZipDir = emqx_trace:zip_dir(), + %% We generate a session ID so that we name files + %% with unique names. Then we won't cause + %% overwrites for concurrent requests. + SessionId = emqx_misc:gen_id(), + ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]), + ok = file:make_dir(ZipDir), + %% Write files to ZipDir and create an in-memory zip file Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), - FileName = binary_to_list(Name) ++ ".zip", - ZipFileName = filename:join([ZipDir, FileName]), - {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), - %% emqx_trace:delete_files_after_send(ZipFileName, Zips), - %% TODO use file replace file_binary.(delete file after send is not ready now). - {ok, Binary} = file:read_file(ZipFile), - ZipName = filename:basename(ZipFile), - _ = file:delete(ZipFile), + ZipName = binary_to_list(Name) ++ ".zip", + Binary = + try + {ok, {ZipName, Bin}} = zip:zip(ZipName, Zips, [memory, {cwd, ZipDir}]), + Bin + after + %% emqx_trace:delete_files_after_send(ZipFileName, Zips), + %% TODO use file replace file_binary.(delete file after send is not ready now). + ok = file:del_dir_r(ZipDir) + end, + ?tp(trace_api_download_trace_log, #{ + files => Zips, + name => Name, + session_id => SessionId, + zip_dir => ZipDir, + zip_name => ZipName + }), Headers = #{ <<"content-type">> => <<"application/x-zip">>, <<"content-disposition">> => iolist_to_binary( diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 82d55bb6a..abab3f9b0 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -61,7 +61,8 @@ uri(Parts) -> %% compatible_mode will return as same as 'emqx_dashboard_api_test_helpers:request' request_api_with_body(Method, Url, Body) -> - request_api(Method, Url, [], auth_header_(), Body, #{compatible_mode => true}). + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + request_api(Method, Url, [], auth_header_(), Body, Opts). request_api(Method, Url) -> request_api(Method, Url, auth_header_()). @@ -111,15 +112,9 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when do_request_api(Method, Request, Opts) -> ReturnAll = maps:get(return_all, Opts, false), CompatibleMode = maps:get(compatible_mode, Opts, false), - ReqOpts = - case CompatibleMode of - true -> - [{body_format, binary}]; - _ -> - [] - end, - ct:pal("Method: ~p, Request: ~p", [Method, Request]), - case httpc:request(Method, Request, [], ReqOpts) of + HttpcReqOpts = maps:get(httpc_req_opts, Opts, []), + ct:pal("Method: ~p, Request: ~p, Opts: ~p", [Method, Request, Opts]), + case httpc:request(Method, Request, [], HttpcReqOpts) of {error, socket_closed_remotely} -> {error, socket_closed_remotely}; {ok, {{_, Code, _}, _Headers, Body}} when CompatibleMode -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index 0ba05b280..6962a9043 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -26,12 +26,6 @@ -include_lib("stdlib/include/zip.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(HOST, "http://127.0.0.1:18083/"). --define(API_VERSION, "v5"). --define(BASE_PATH, "api"). - --import(emqx_dashboard_SUITE, [auth_header_/0]). - %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -49,14 +43,14 @@ end_per_suite(_) -> t_http_test(_Config) -> emqx_trace:clear(), load(), - Header = auth_header_(), %% list - {ok, Empty} = request_api(get, api_path("trace"), Header), + {ok, Empty} = request_api(get, api_path("trace")), ?assertEqual([], json(Empty)), %% create ErrorTrace = #{}, - {error, {"HTTP/1.1", 400, "Bad Request"}, Body} = - request_api(post, api_path("trace"), Header, ErrorTrace), + Opts = #{return_all => true}, + {error, {{"HTTP/1.1", 400, "Bad Request"}, _, Body}} = + emqx_mgmt_api_test_util:request_api(post, api_path("trace"), [], [], ErrorTrace, Opts), ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, json(Body)), Name = <<"test-name">>, @@ -66,15 +60,15 @@ t_http_test(_Config) -> {<<"topic">>, <<"/x/y/z">>} ], - {ok, Create} = request_api(post, api_path("trace"), Header, Trace), + {ok, Create} = request_api(post, api_path("trace"), Trace), ?assertMatch(#{<<"name">> := Name}, json(Create)), - {ok, List} = request_api(get, api_path("trace"), Header), + {ok, List} = request_api(get, api_path("trace")), [Data] = json(List), ?assertEqual(Name, maps:get(<<"name">>, Data)), %% update - {ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}), + {ok, Update} = request_api(put, api_path("trace/test-name/stop"), #{}), ?assertEqual( #{ <<"enable">> => false, @@ -84,10 +78,10 @@ t_http_test(_Config) -> ), ?assertMatch( - {error, {"HTTP/1.1", 404, _}, _}, - request_api(put, api_path("trace/test-name-not-found/stop"), Header, #{}) + {error, {"HTTP/1.1", 404, _}}, + request_api(put, api_path("trace/test-name-not-found/stop"), #{}) ), - {ok, List1} = request_api(get, api_path("trace"), Header), + {ok, List1} = request_api(get, api_path("trace")), [Data1] = json(List1), Node = atom_to_binary(node()), ?assertMatch( @@ -104,11 +98,11 @@ t_http_test(_Config) -> ), %% delete - {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), + {ok, Delete} = request_api(delete, api_path("trace/test-name")), ?assertEqual(<<>>, Delete), - {error, {"HTTP/1.1", 404, "Not Found"}, DeleteNotFound} = - request_api(delete, api_path("trace/test-name"), Header), + {error, {{"HTTP/1.1", 404, "Not Found"}, _, DeleteNotFound}} = + emqx_mgmt_api_test_util:request_api(delete, api_path("trace/test-name"), [], [], [], Opts), ?assertEqual( #{ <<"code">> => <<"NOT_FOUND">>, @@ -117,14 +111,14 @@ t_http_test(_Config) -> json(DeleteNotFound) ), - {ok, List2} = request_api(get, api_path("trace"), Header), + {ok, List2} = request_api(get, api_path("trace")), ?assertEqual([], json(List2)), %% clear - {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), + {ok, Create1} = request_api(post, api_path("trace"), Trace), ?assertMatch(#{<<"name">> := Name}, json(Create1)), - {ok, Clear} = request_api(delete, api_path("trace"), Header), + {ok, Clear} = request_api(delete, api_path("trace")), ?assertEqual(<<>>, Clear), unload(), @@ -132,27 +126,26 @@ t_http_test(_Config) -> t_create_failed(_Config) -> load(), - Header = auth_header_(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], BadName1 = {<<"name">>, <<"test/bad">>}, ?assertMatch( - {error, {"HTTP/1.1", 400, _}, _}, - request_api(post, api_path("trace"), Header, [BadName1 | Trace]) + {error, {"HTTP/1.1", 400, _}}, + request_api(post, api_path("trace"), [BadName1 | Trace]) ), BadName2 = {<<"name">>, list_to_binary(lists:duplicate(257, "t"))}, ?assertMatch( - {error, {"HTTP/1.1", 400, _}, _}, - request_api(post, api_path("trace"), Header, [BadName2 | Trace]) + {error, {"HTTP/1.1", 400, _}}, + request_api(post, api_path("trace"), [BadName2 | Trace]) ), %% already_exist GoodName = {<<"name">>, <<"test-name-0">>}, - {ok, Create} = request_api(post, api_path("trace"), Header, [GoodName | Trace]), + {ok, Create} = request_api(post, api_path("trace"), [GoodName | Trace]), ?assertMatch(#{<<"name">> := <<"test-name-0">>}, json(Create)), ?assertMatch( - {error, {"HTTP/1.1", 409, _}, _}, - request_api(post, api_path("trace"), Header, [GoodName | Trace]) + {error, {"HTTP/1.1", 409, _}}, + request_api(post, api_path("trace"), [GoodName | Trace]) ), %% MAX Limited @@ -170,17 +163,18 @@ t_create_failed(_Config) -> ), GoodName1 = {<<"name">>, <<"test-name-1">>}, ?assertMatch( - {error, {"HTTP/1.1", 400, _}, _}, - request_api(post, api_path("trace"), Header, [GoodName1 | Trace]) + {error, {"HTTP/1.1", 400, _}}, + request_api(post, api_path("trace"), [GoodName1 | Trace]) ), %% clear - ?assertMatch({ok, _}, request_api(delete, api_path("trace"), Header, [])), - {ok, Create} = request_api(post, api_path("trace"), Header, [GoodName | Trace]), + ?assertMatch({ok, _}, request_api(delete, api_path("trace"), [])), + {ok, Create1} = request_api(post, api_path("trace"), [GoodName | Trace]), + ?assertMatch(#{<<"name">> := <<"test-name-0">>}, json(Create1)), %% new name but same trace GoodName2 = {<<"name">>, <<"test-name-1">>}, ?assertMatch( - {error, {"HTTP/1.1", 409, _}, _}, - request_api(post, api_path("trace"), Header, [GoodName2 | Trace]) + {error, {"HTTP/1.1", 409, _}}, + request_api(post, api_path("trace"), [GoodName2 | Trace]) ), unload(), @@ -202,14 +196,13 @@ t_log_file(_Config) -> || _ <- lists:seq(1, 5) ], ok = emqx_trace_handler_SUITE:filesync(Name, clientid), - Header = auth_header_(), ?assertMatch( - {error, {"HTTP/1.1", 404, "Not Found"}, _}, - request_api(get, api_path("trace/test_client_not_found/log_detail"), Header) + {error, {"HTTP/1.1", 404, "Not Found"}}, + request_api(get, api_path("trace/test_client_not_found/log_detail")) ), - {ok, Detail} = request_api(get, api_path("trace/test_client_id/log_detail"), Header), + {ok, Detail} = request_api(get, api_path("trace/test_client_id/log_detail")), ?assertMatch([#{<<"mtime">> := _, <<"size">> := _, <<"node">> := _}], json(Detail)), - {ok, Binary} = request_api(get, api_path("trace/test_client_id/download"), Header), + {ok, Binary} = request_api(get, api_path("trace/test_client_id/download")), {ok, [ Comment, #zip_file{ @@ -221,7 +214,7 @@ t_log_file(_Config) -> ZipNamePrefix = lists:flatten(io_lib:format("~s-trace_~s", [node(), Name])), ?assertNotEqual(nomatch, re:run(ZipName, [ZipNamePrefix])), Path = api_path("trace/test_client_id/download?node=" ++ atom_to_list(node())), - {ok, Binary2} = request_api(get, Path, Header), + {ok, Binary2} = request_api(get, Path), ?assertMatch( {ok, [ Comment, @@ -232,25 +225,22 @@ t_log_file(_Config) -> ]}, zip:table(Binary2) ), - {error, {_, 400, _}, _} = + {error, {_, 400, _}} = request_api( get, - api_path("trace/test_client_id/download?node=unknonwn_node"), - Header + api_path("trace/test_client_id/download?node=unknonwn_node") ), - {error, {_, 400, _}, _} = + {error, {_, 400, _}} = request_api( get, % known atom but unknown node - api_path("trace/test_client_id/download?node=undefined"), - Header + api_path("trace/test_client_id/download?node=undefined") ), ?assertMatch( - {error, {"HTTP/1.1", 404, "Not Found"}, _}, + {error, {"HTTP/1.1", 404, "Not Found"}}, request_api( get, - api_path("trace/test_client_not_found/download?node=" ++ atom_to_list(node())), - Header + api_path("trace/test_client_not_found/download?node=" ++ atom_to_list(node())) ) ), ok = emqtt:disconnect(Client), @@ -297,64 +287,87 @@ t_stream_log(_Config) -> ct:pal("FileName: ~p", [File]), {ok, FileBin} = file:read_file(File), ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), - Header = auth_header_(), - {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10")), #{<<"meta">> := Meta, <<"items">> := Bin} = json(Binary), ?assertEqual(10, byte_size(Bin)), ?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta), Path = api_path("trace/test_stream_log/log?position=20&bytes=10"), - {ok, Binary1} = request_api(get, Path, Header), + {ok, Binary1} = request_api(get, Path), #{<<"meta">> := Meta1, <<"items">> := Bin1} = json(Binary1), ?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1), ?assertEqual(10, byte_size(Bin1)), - {error, {_, 400, _}, _} = + {error, {_, 400, _}} = request_api( get, - api_path("trace/test_stream_log/log?node=unknonwn_node"), - Header + api_path("trace/test_stream_log/log?node=unknonwn_node") ), - {error, {_, 400, _}, _} = + {error, {_, 400, _}} = request_api( get, % known atom but not a node - api_path("trace/test_stream_log/log?node=undefined"), - Header + api_path("trace/test_stream_log/log?node=undefined") ), - {error, {_, 404, _}, _} = + {error, {_, 404, _}} = request_api( get, - api_path("trace/test_stream_log_not_found/log"), - Header + api_path("trace/test_stream_log_not_found/log") ), unload(), ok. +t_trace_files_are_deleted_after_download(_Config) -> + ClientId = <<"client-test-delete-after-download">>, + Now = erlang:system_time(second), + Name = <<"test_client_id">>, + load(), + create_trace(Name, ClientId, Now), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [ + begin + _ = emqtt:ping(Client) + end + || _ <- lists:seq(1, 5) + ], + ok = emqtt:disconnect(Client), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + + %% Check that files have been removed after download and that zip + %% directories uses unique session ids + ?check_trace( + begin + %% Download two zip files + Path = api_path(["trace/", binary_to_list(Name), "/download"]), + {ok, Binary1} = request_api(get, Path), + {ok, Binary2} = request_api(get, Path), + ?assertMatch({ok, _}, zip:table(Binary1)), + ?assertMatch({ok, _}, zip:table(Binary2)) + end, + fun(Trace) -> + [ + #{session_id := SessionId1, zip_dir := ZipDir1}, + #{session_id := SessionId2, zip_dir := ZipDir2} + ] = ?of_kind(trace_api_download_trace_log, Trace), + ?assertEqual({error, enoent}, file:list_dir(ZipDir1)), + ?assertEqual({error, enoent}, file:list_dir(ZipDir2)), + ?assertNotEqual(SessionId1, SessionId2), + ?assertNotEqual(ZipDir1, ZipDir2) + end + ), + ok. + to_rfc3339(Second) -> list_to_binary(calendar:system_time_to_rfc3339(Second)). -request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). +request_api(Method, Url) -> + request_api(Method, Url, []). -request_api(Method, Url, Auth, Body) -> - Request = {Url, [Auth], "application/json", emqx_json:encode(Body)}, - do_request_api(Method, Request). - -do_request_api(Method, Request) -> - ct:pal("Method: ~p, Request: ~p", [Method, Request]), - case httpc:request(Method, Request, [], [{body_format, binary}]) of - {error, socket_closed_remotely} -> - {error, socket_closed_remotely}; - {error, {shutdown, server_closed}} -> - {error, server_closed}; - {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when - Code =:= 200 orelse Code =:= 201 orelse Code =:= 204 - -> - {ok, Return}; - {ok, {Reason, _Header, Body}} -> - {error, Reason, Body} - end. +request_api(Method, Url, Body) -> + Opts = #{httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, Url, [], [], Body, Opts). api_path(Path) -> - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]). + emqx_mgmt_api_test_util:api_path([Path]). json(Data) -> {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), diff --git a/changes/v5.0.15/fix-9781.en.md b/changes/v5.0.15/fix-9781.en.md new file mode 100644 index 000000000..2b34ddc24 --- /dev/null +++ b/changes/v5.0.15/fix-9781.en.md @@ -0,0 +1 @@ +Trace files were left on a node when creating a zip file for download. They are now removed when the file is sent. Also, concurrent downloads will no longer interfere with each other. diff --git a/changes/v5.0.15/fix-9781.zh.md b/changes/v5.0.15/fix-9781.zh.md new file mode 100644 index 000000000..5c4cee0f5 --- /dev/null +++ b/changes/v5.0.15/fix-9781.zh.md @@ -0,0 +1 @@ +当下载 日志追踪 的日志时,一些中间文件将存留在处理节点上,现在这个问题得到了修复。同时,并发下载日志将不再相互干扰。