From 1b14b792865355000f7d4818ce0e4b398b2d124f Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 29 Nov 2021 15:18:09 +0800 Subject: [PATCH 1/2] fix: create trace schema at runtime --- apps/emqx_management/test/emqx_mgmt_SUITE.erl | 2 +- .../src/emqx_trace/emqx_trace.erl | 28 ++++++++----------- .../test/emqx_trace_SUITE.erl | 27 ++++++------------ 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 434e96e21..f21a0f66c 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -240,7 +240,7 @@ t_trace_cmd(_) -> logger:set_primary_config(level, error). t_traces_cmd(_) -> - emqx_trace:mnesia(boot), + emqx_trace:create_table(), Count1 = emqx_mgmt_cli:traces(["list"]), ?assertEqual(0, Count1), Error1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]), diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl index c71a457a6..135a1f53b 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl @@ -22,12 +22,6 @@ -logger_header("[Tracer]"). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -export([ publish/1 , subscribe/3 , unsubscribe/2 @@ -57,7 +51,9 @@ -define(MAX_SIZE, 30). -ifdef(TEST). --export([log_file/2]). +-export([ log_file/2 + , create_table/0 + ]). -endif. -export_type([ip_address/0]). @@ -72,15 +68,6 @@ , end_at :: integer() | undefined | '_' }). -mnesia(boot) -> - ok = ekka_mnesia:create_table(?TRACE, [ - {type, set}, - {disc_copies, [node()]}, - {record_name, ?TRACE}, - {attributes, record_info(fields, ?TRACE)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TRACE, disc_copies). - publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> @@ -198,6 +185,7 @@ format(Traces) -> end, Traces). init([]) -> + ok = create_table(), erlang:process_flag(trap_exit, true), OriginLogLevel = emqx_logger:get_primary_log_level(), ok = filelib:ensure_dir(trace_dir()), @@ -208,6 +196,14 @@ init([]) -> TRef = update_trace(Traces), {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. +create_table() -> + ok = ekka_mnesia:create_table(?TRACE, [ + {type, set}, + {disc_copies, [node()]}, + {record_name, ?TRACE}, + {attributes, record_info(fields, ?TRACE)}]), + ok = ekka_mnesia:copy_table(?TRACE, disc_copies). + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ok, State}. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index ffa2bc1fb..c08211e02 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -33,15 +33,22 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_plugin_libs), emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). -t_base_create_delete(_Config) -> +init_per_testcase(_, Config) -> + load(), ok = emqx_trace:clear(), + Config. + +end_per_testcase(_) -> + unload(), + ok. + +t_base_create_delete(_Config) -> Now = erlang:system_time(second), Start = to_rfc3339(Now), End = to_rfc3339(Now + 30 * 60), @@ -84,7 +91,6 @@ t_base_create_delete(_Config) -> ok. t_create_size_max(_Config) -> - emqx_trace:clear(), lists:map(fun(Seq) -> Name = list_to_binary("name" ++ integer_to_list(Seq)), Trace = [{name, Name}, {type, <<"topic">>}, @@ -100,7 +106,6 @@ t_create_size_max(_Config) -> ok. t_create_failed(_Config) -> - ok = emqx_trace:clear(), UnknownField = [{<<"unknown">>, 12}], {error, Reason1} = emqx_trace:create(UnknownField), ?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)), @@ -139,7 +144,6 @@ t_create_failed(_Config) -> ok. t_create_default(_Config) -> - ok = emqx_trace:clear(), {error, "name required"} = emqx_trace:create([]), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]), @@ -170,7 +174,6 @@ t_create_default(_Config) -> ok. t_update_enable(_Config) -> - ok = emqx_trace:clear(), Name = <<"test-name">>, Now = erlang:system_time(second), End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), @@ -192,8 +195,6 @@ t_update_enable(_Config) -> ok. t_load_state(_Config) -> - emqx_trace:clear(), - load(), Now = erlang:system_time(second), Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, @@ -218,14 +219,11 @@ t_load_state(_Config) -> Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], ?assertEqual(ExpectEnables2, lists:sort(Enables2)), - unload(), ok. t_client_event(_Config) -> application:set_env(emqx, allow_anonymous, true), - emqx_trace:clear(), ClientId = <<"client-test">>, - load(), Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id_event">>, @@ -252,12 +250,9 @@ t_client_event(_Config) -> ?assert(erlang:byte_size(Bin) > 0), ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), ?assert(erlang:byte_size(Bin3) > 0), - unload(), ok. t_get_log_filename(_Config) -> - ok = emqx_trace:clear(), - load(), Now = erlang:system_time(second), Start = calendar:system_time_to_rfc3339(Now), End = calendar:system_time_to_rfc3339(Now + 2), @@ -274,7 +269,6 @@ t_get_log_filename(_Config) -> ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), ct:sleep(3000), ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), - unload(), ok. t_trace_file(_Config) -> @@ -290,8 +284,6 @@ t_trace_file(_Config) -> ok. t_download_log(_Config) -> - emqx_trace:clear(), - load(), ClientId = <<"client-test">>, Now = erlang:system_time(second), Start = to_rfc3339(Now), @@ -305,7 +297,6 @@ t_download_log(_Config) -> {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), - unload(), ok. to_rfc3339(Second) -> From 0d218df14d19632eac72aa075d365dbdd978db07 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 29 Nov 2021 23:08:56 +0800 Subject: [PATCH 2/2] fix: replace ct:sleep/1 by filesync/2 --- .../src/emqx_trace/emqx_trace_api.erl | 6 ++- .../test/emqx_trace_SUITE.erl | 11 +++--- test/emqx_trace_handler_SUITE.erl | 37 ++++++++++++++++--- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index 7d982c00d..8e052ca66 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -107,8 +107,10 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> case Res of {ok, Node, Bin} -> ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, - ok = file:write_file(ZipName, Bin), - [Node ++ "-" ++ TraceLog | Acc]; + case file:write_file(ZipName, Bin) of + ok -> [Node ++ "-" ++ TraceLog | Acc]; + _ -> Acc + end; {error, Node, Reason} -> ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), Acc diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index c08211e02..94ab71270 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -229,21 +229,22 @@ t_client_event(_Config) -> Name = <<"test_client_id_event">>, ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), - ct:sleep(200), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), emqtt:ping(Client), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), - ct:sleep(200), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), ok = emqx_trace:create([{<<"name">>, <<"test_topic">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), - ct:sleep(200), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), ok = emqtt:disconnect(Client), - ct:sleep(200), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)), {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)), ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), @@ -293,7 +294,7 @@ t_download_log(_Config) -> {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], - ct:sleep(100), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), diff --git a/test/emqx_trace_handler_SUITE.erl b/test/emqx_trace_handler_SUITE.erl index e314c7994..b42b1a591 100644 --- a/test/emqx_trace_handler_SUITE.erl +++ b/test/emqx_trace_handler_SUITE.erl @@ -62,7 +62,9 @@ t_trace_clientid(_Config) -> emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"), {error, {handler_not_added, {file_error, ".", eisdir}}} = emqx_trace_handler:install(clientid, <<"client5">>, debug, "."), - ct:sleep(100), + ok = filesync(<<"client">>, clientid), + ok = filesync(<<"client2">>, clientid), + ok = filesync(<<"client3">>, clientid), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/client.log")), @@ -83,7 +85,9 @@ t_trace_clientid(_Config) -> emqtt:connect(T), emqtt:publish(T, <<"a/b/c">>, <<"hi">>), emqtt:ping(T), - ct:sleep(200), + ok = filesync(<<"client">>, clientid), + ok = filesync(<<"client2">>, clientid), + ok = filesync(<<"client3">>, clientid), %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log". {ok, Bin} = file:read_file("tmp/client.log"), @@ -109,7 +113,8 @@ t_trace_topic(_Config) -> emqx_logger:set_log_level(debug), ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), - ct:sleep(100), + ok = filesync(<<"x/#">>, topic), + ok = filesync(<<"y/#">>, topic), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/topic_trace_x.log")), @@ -128,7 +133,8 @@ t_trace_topic(_Config) -> emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), emqtt:subscribe(T, <<"x/y/z">>), emqtt:unsubscribe(T, <<"x/y/z">>), - ct:sleep(200), + ok = filesync(<<"x/#">>, topic), + ok = filesync(<<"y/#">>, topic), {ok, Bin} = file:read_file("tmp/topic_trace_x.log"), ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), @@ -152,7 +158,8 @@ t_trace_ip_address(_Config) -> %% Start tracing ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"), ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"), - ct:sleep(100), + ok = filesync(<<"127.0.0.1">>, ip_address), + ok = filesync(<<"192.168.1.1">>, ip_address), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/ip_trace_x.log")), @@ -173,7 +180,8 @@ t_trace_ip_address(_Config) -> emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), emqtt:subscribe(T, <<"x/y/z">>), emqtt:unsubscribe(T, <<"x/y/z">>), - ct:sleep(200), + ok = filesync(<<"127.0.0.1">>, ip_address), + ok = filesync(<<"192.168.1.1">>, ip_address), {ok, Bin} = file:read_file("tmp/ip_trace_x.log"), ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), @@ -189,3 +197,20 @@ t_trace_ip_address(_Config) -> {error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>), emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()). + + +filesync(Name, Type) -> + filesync(Name, Type, 3). + +%% sometime the handler process is not started yet. +filesync(_Name, _Type, 0) -> ok; +filesync(Name, Type, Retry) -> + try + Handler = binary_to_atom(<<"trace_", + (atom_to_binary(Type))/binary, "_", Name/binary>>), + ok = logger_disk_log_h:filesync(Handler) + catch E:R -> + ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]), + ct:sleep(100), + filesync(Name, Type, Retry - 1) + end.