Merge pull request #6324 from zhongwencool/create-trace-schema-runtime
fix: create trace schema at runtime
This commit is contained in:
commit
20a98bee62
|
@ -240,7 +240,7 @@ t_trace_cmd(_) ->
|
||||||
logger:set_primary_config(level, error).
|
logger:set_primary_config(level, error).
|
||||||
|
|
||||||
t_traces_cmd(_) ->
|
t_traces_cmd(_) ->
|
||||||
emqx_trace:mnesia(boot),
|
emqx_trace:create_table(),
|
||||||
Count1 = emqx_mgmt_cli:traces(["list"]),
|
Count1 = emqx_mgmt_cli:traces(["list"]),
|
||||||
?assertEqual(0, Count1),
|
?assertEqual(0, Count1),
|
||||||
Error1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]),
|
Error1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]),
|
||||||
|
|
|
@ -22,12 +22,6 @@
|
||||||
|
|
||||||
-logger_header("[Tracer]").
|
-logger_header("[Tracer]").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
|
||||||
-export([mnesia/1]).
|
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
|
||||||
|
|
||||||
-export([ publish/1
|
-export([ publish/1
|
||||||
, subscribe/3
|
, subscribe/3
|
||||||
, unsubscribe/2
|
, unsubscribe/2
|
||||||
|
@ -57,7 +51,9 @@
|
||||||
-define(MAX_SIZE, 30).
|
-define(MAX_SIZE, 30).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([log_file/2]).
|
-export([ log_file/2
|
||||||
|
, create_table/0
|
||||||
|
]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-export_type([ip_address/0]).
|
-export_type([ip_address/0]).
|
||||||
|
@ -72,15 +68,6 @@
|
||||||
, end_at :: integer() | undefined | '_'
|
, end_at :: integer() | undefined | '_'
|
||||||
}).
|
}).
|
||||||
|
|
||||||
mnesia(boot) ->
|
|
||||||
ok = ekka_mnesia:create_table(?TRACE, [
|
|
||||||
{type, set},
|
|
||||||
{disc_copies, [node()]},
|
|
||||||
{record_name, ?TRACE},
|
|
||||||
{attributes, record_info(fields, ?TRACE)}]);
|
|
||||||
mnesia(copy) ->
|
|
||||||
ok = ekka_mnesia:copy_table(?TRACE, disc_copies).
|
|
||||||
|
|
||||||
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
||||||
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
||||||
is_binary(From); is_atom(From) ->
|
is_binary(From); is_atom(From) ->
|
||||||
|
@ -198,6 +185,7 @@ format(Traces) ->
|
||||||
end, Traces).
|
end, Traces).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
ok = create_table(),
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
||||||
ok = filelib:ensure_dir(trace_dir()),
|
ok = filelib:ensure_dir(trace_dir()),
|
||||||
|
@ -208,6 +196,14 @@ init([]) ->
|
||||||
TRef = update_trace(Traces),
|
TRef = update_trace(Traces),
|
||||||
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
||||||
|
|
||||||
|
create_table() ->
|
||||||
|
ok = ekka_mnesia:create_table(?TRACE, [
|
||||||
|
{type, set},
|
||||||
|
{disc_copies, [node()]},
|
||||||
|
{record_name, ?TRACE},
|
||||||
|
{attributes, record_info(fields, ?TRACE)}]),
|
||||||
|
ok = ekka_mnesia:copy_table(?TRACE, disc_copies).
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
|
@ -107,8 +107,10 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
||||||
case Res of
|
case Res of
|
||||||
{ok, Node, Bin} ->
|
{ok, Node, Bin} ->
|
||||||
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
||||||
ok = file:write_file(ZipName, Bin),
|
case file:write_file(ZipName, Bin) of
|
||||||
[Node ++ "-" ++ TraceLog | Acc];
|
ok -> [Node ++ "-" ++ TraceLog | Acc];
|
||||||
|
_ -> Acc
|
||||||
|
end;
|
||||||
{error, Node, Reason} ->
|
{error, Node, Reason} ->
|
||||||
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
||||||
Acc
|
Acc
|
||||||
|
|
|
@ -33,15 +33,22 @@ all() ->
|
||||||
emqx_ct:all(?MODULE).
|
emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:load(emqx_plugin_libs),
|
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
t_base_create_delete(_Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
load(),
|
||||||
ok = emqx_trace:clear(),
|
ok = emqx_trace:clear(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_) ->
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_base_create_delete(_Config) ->
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
End = to_rfc3339(Now + 30 * 60),
|
End = to_rfc3339(Now + 30 * 60),
|
||||||
|
@ -84,7 +91,6 @@ t_base_create_delete(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_size_max(_Config) ->
|
t_create_size_max(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
lists:map(fun(Seq) ->
|
lists:map(fun(Seq) ->
|
||||||
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||||
Trace = [{name, Name}, {type, <<"topic">>},
|
Trace = [{name, Name}, {type, <<"topic">>},
|
||||||
|
@ -100,7 +106,6 @@ t_create_size_max(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_failed(_Config) ->
|
t_create_failed(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
UnknownField = [{<<"unknown">>, 12}],
|
UnknownField = [{<<"unknown">>, 12}],
|
||||||
{error, Reason1} = emqx_trace:create(UnknownField),
|
{error, Reason1} = emqx_trace:create(UnknownField),
|
||||||
?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
|
?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
|
||||||
|
@ -139,7 +144,6 @@ t_create_failed(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_create_default(_Config) ->
|
t_create_default(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
{error, "name required"} = emqx_trace:create([]),
|
{error, "name required"} = emqx_trace:create([]),
|
||||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
|
{<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
|
||||||
|
@ -170,7 +174,6 @@ t_create_default(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_update_enable(_Config) ->
|
t_update_enable(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
Name = <<"test-name">>,
|
Name = <<"test-name">>,
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||||
|
@ -192,8 +195,6 @@ t_update_enable(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_load_state(_Config) ->
|
t_load_state(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
||||||
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
||||||
|
@ -218,46 +219,41 @@ t_load_state(_Config) ->
|
||||||
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
||||||
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
||||||
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_client_event(_Config) ->
|
t_client_event(_Config) ->
|
||||||
application:set_env(emqx, allow_anonymous, true),
|
application:set_env(emqx, allow_anonymous, true),
|
||||||
emqx_trace:clear(),
|
|
||||||
ClientId = <<"client-test">>,
|
ClientId = <<"client-test">>,
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
Name = <<"test_client_id_event">>,
|
Name = <<"test_client_id_event">>,
|
||||||
ok = emqx_trace:create([{<<"name">>, Name},
|
ok = emqx_trace:create([{<<"name">>, Name},
|
||||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||||
ct:sleep(200),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
emqtt:ping(Client),
|
emqtt:ping(Client),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
||||||
ct:sleep(200),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
||||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
||||||
ct:sleep(200),
|
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
|
||||||
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
||||||
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
ct:sleep(200),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
|
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
|
||||||
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
||||||
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
||||||
?assert(erlang:byte_size(Bin) > 0),
|
?assert(erlang:byte_size(Bin) > 0),
|
||||||
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
||||||
?assert(erlang:byte_size(Bin3) > 0),
|
?assert(erlang:byte_size(Bin3) > 0),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_get_log_filename(_Config) ->
|
t_get_log_filename(_Config) ->
|
||||||
ok = emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = calendar:system_time_to_rfc3339(Now),
|
Start = calendar:system_time_to_rfc3339(Now),
|
||||||
End = calendar:system_time_to_rfc3339(Now + 2),
|
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||||
|
@ -274,7 +270,6 @@ t_get_log_filename(_Config) ->
|
||||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
ct:sleep(3000),
|
ct:sleep(3000),
|
||||||
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_trace_file(_Config) ->
|
t_trace_file(_Config) ->
|
||||||
|
@ -290,8 +285,6 @@ t_trace_file(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_download_log(_Config) ->
|
t_download_log(_Config) ->
|
||||||
emqx_trace:clear(),
|
|
||||||
load(),
|
|
||||||
ClientId = <<"client-test">>,
|
ClientId = <<"client-test">>,
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Start = to_rfc3339(Now),
|
Start = to_rfc3339(Now),
|
||||||
|
@ -301,11 +294,10 @@ t_download_log(_Config) ->
|
||||||
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||||
ct:sleep(100),
|
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
|
||||||
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
|
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
|
||||||
?assert(filelib:file_size(ZipFile) > 0),
|
?assert(filelib:file_size(ZipFile) > 0),
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
unload(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
to_rfc3339(Second) ->
|
to_rfc3339(Second) ->
|
||||||
|
|
|
@ -62,7 +62,9 @@ t_trace_clientid(_Config) ->
|
||||||
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
||||||
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
||||||
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
||||||
ct:sleep(100),
|
ok = filesync(<<"client">>, clientid),
|
||||||
|
ok = filesync(<<"client2">>, clientid),
|
||||||
|
ok = filesync(<<"client3">>, clientid),
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
%% Verify the tracing file exits
|
||||||
?assert(filelib:is_regular("tmp/client.log")),
|
?assert(filelib:is_regular("tmp/client.log")),
|
||||||
|
@ -83,7 +85,9 @@ t_trace_clientid(_Config) ->
|
||||||
emqtt:connect(T),
|
emqtt:connect(T),
|
||||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||||
emqtt:ping(T),
|
emqtt:ping(T),
|
||||||
ct:sleep(200),
|
ok = filesync(<<"client">>, clientid),
|
||||||
|
ok = filesync(<<"client2">>, clientid),
|
||||||
|
ok = filesync(<<"client3">>, clientid),
|
||||||
|
|
||||||
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
||||||
{ok, Bin} = file:read_file("tmp/client.log"),
|
{ok, Bin} = file:read_file("tmp/client.log"),
|
||||||
|
@ -109,7 +113,8 @@ t_trace_topic(_Config) ->
|
||||||
emqx_logger:set_log_level(debug),
|
emqx_logger:set_log_level(debug),
|
||||||
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
||||||
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
||||||
ct:sleep(100),
|
ok = filesync(<<"x/#">>, topic),
|
||||||
|
ok = filesync(<<"y/#">>, topic),
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
%% Verify the tracing file exits
|
||||||
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
|
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
|
||||||
|
@ -128,7 +133,8 @@ t_trace_topic(_Config) ->
|
||||||
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||||
emqtt:subscribe(T, <<"x/y/z">>),
|
emqtt:subscribe(T, <<"x/y/z">>),
|
||||||
emqtt:unsubscribe(T, <<"x/y/z">>),
|
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||||
ct:sleep(200),
|
ok = filesync(<<"x/#">>, topic),
|
||||||
|
ok = filesync(<<"y/#">>, topic),
|
||||||
|
|
||||||
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
|
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
|
||||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||||
|
@ -152,7 +158,8 @@ t_trace_ip_address(_Config) ->
|
||||||
%% Start tracing
|
%% Start tracing
|
||||||
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
||||||
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
||||||
ct:sleep(100),
|
ok = filesync(<<"127.0.0.1">>, ip_address),
|
||||||
|
ok = filesync(<<"192.168.1.1">>, ip_address),
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
%% Verify the tracing file exits
|
||||||
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
||||||
|
@ -173,7 +180,8 @@ t_trace_ip_address(_Config) ->
|
||||||
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||||
emqtt:subscribe(T, <<"x/y/z">>),
|
emqtt:subscribe(T, <<"x/y/z">>),
|
||||||
emqtt:unsubscribe(T, <<"x/y/z">>),
|
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||||
ct:sleep(200),
|
ok = filesync(<<"127.0.0.1">>, ip_address),
|
||||||
|
ok = filesync(<<"192.168.1.1">>, ip_address),
|
||||||
|
|
||||||
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
|
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
|
||||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||||
|
@ -189,3 +197,20 @@ t_trace_ip_address(_Config) ->
|
||||||
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
|
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
|
||||||
emqtt:disconnect(T),
|
emqtt:disconnect(T),
|
||||||
?assertEqual([], emqx_trace_handler:running()).
|
?assertEqual([], emqx_trace_handler:running()).
|
||||||
|
|
||||||
|
|
||||||
|
filesync(Name, Type) ->
|
||||||
|
filesync(Name, Type, 3).
|
||||||
|
|
||||||
|
%% sometime the handler process is not started yet.
|
||||||
|
filesync(_Name, _Type, 0) -> ok;
|
||||||
|
filesync(Name, Type, Retry) ->
|
||||||
|
try
|
||||||
|
Handler = binary_to_atom(<<"trace_",
|
||||||
|
(atom_to_binary(Type))/binary, "_", Name/binary>>),
|
||||||
|
ok = logger_disk_log_h:filesync(Handler)
|
||||||
|
catch E:R ->
|
||||||
|
ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
|
||||||
|
ct:sleep(100),
|
||||||
|
filesync(Name, Type, Retry - 1)
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue