fix(test): trace_handler ct fail

This commit is contained in:
zhongwencool 2021-12-29 01:15:47 +08:00
parent 4b6bba11eb
commit 8b5b3a448a
5 changed files with 1768 additions and 42 deletions

File diff suppressed because it is too large Load Diff

View File

@ -112,7 +112,6 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
case logger_config:get(ets:whereis(logger), Id) of
{ok, #{module := Module} = HandlerConfig0} ->
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
io:format("~p~n", [{Module, Log, HandlerConfig}]),
try Module:log(Log, HandlerConfig)
catch C:R:S ->
case logger:remove_handler(Id) of

View File

@ -50,15 +50,15 @@ end_per_testcase(_Case, _Config) ->
t_trace_clientid(_Config) ->
%% Start tracing
%% add list clientid
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
ok = emqx_trace_handler:install("CLI-client1", clientid, "client", debug, "tmp/client.log"),
ok = emqx_trace_handler:install("CLI-client2", clientid, <<"client2">>, all, "tmp/client2.log"),
ok = emqx_trace_handler:install("CLI-client3", clientid, <<"client3">>, all, "tmp/client3.log"),
{error, {handler_not_added, {file_error, ".", eisdir}}} =
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
emqx_trace:check(),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
ok = filesync(<<"CLI-client1">>, clientid),
ok = filesync(<<"CLI-client2">>, clientid),
ok = filesync(<<"CLI-client3">>, clientid),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/client.log")),
@ -66,11 +66,11 @@ t_trace_clientid(_Config) ->
?assert(filelib:is_regular("tmp/client3.log")),
%% Get current traces
?assertMatch([#{type := clientid, filter := "client", name := <<"client">>,
?assertMatch([#{type := clientid, filter := "client", name := <<"CLI-client1">>,
level := debug, dst := "tmp/client.log"},
#{type := clientid, filter := "client2", name := <<"client2">>
#{type := clientid, filter := "client2", name := <<"CLI-client2">>
, level := debug, dst := "tmp/client2.log"},
#{type := clientid, filter := "client3", name := <<"client3">>,
#{type := clientid, filter := "client3", name := <<"CLI-client3">>,
level := debug, dst := "tmp/client3.log"}
], emqx_trace_handler:running()),
@ -79,9 +79,9 @@ t_trace_clientid(_Config) ->
emqtt:connect(T),
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
emqtt:ping(T),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
ok = filesync(<<"CLI-client1">>, clientid),
ok = filesync(<<"CLI-client2">>, clientid),
ok = filesync(<<"CLI-client3">>, clientid),
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
{ok, Bin} = file:read_file("tmp/client.log"),
@ -92,24 +92,24 @@ t_trace_clientid(_Config) ->
?assert(filelib:file_size("tmp/client2.log") == 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(clientid, <<"client">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client2">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client3">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client1">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client2">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client3">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
t_trace_clientid_utf8(_) ->
Utf8Id = <<"client 漢字編碼"/utf8>>,
ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
ok = emqx_trace_handler:install("CLI-UTF8", clientid, Utf8Id, debug, "tmp/client-utf8.log"),
emqx_trace:check(),
{ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
emqtt:connect(T),
[begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
emqtt:ping(T),
ok = filesync(Utf8Id, clientid),
ok = emqx_trace_handler:uninstall(clientid, Utf8Id),
ok = filesync("CLI-UTF8", clientid),
ok = emqx_trace_handler:uninstall(clientid, "CLI-UTF8"),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()),
ok.
@ -119,11 +119,11 @@ t_trace_topic(_Config) ->
emqtt:connect(T),
%% Start tracing
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
ok = emqx_trace_handler:install("CLI-TOPIC-1", topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
ok = emqx_trace_handler:install("CLI-TOPIC-2", topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
emqx_trace:check(),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
ok = filesync("CLI-TOPIC-1", topic),
ok = filesync("CLI-TOPIC-2", topic),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
@ -131,9 +131,9 @@ t_trace_topic(_Config) ->
%% Get current traces
?assertMatch([#{type := topic, filter := <<"x/#">>,
level := debug, dst := "tmp/topic_trace_x.log", name := <<"x/#">>},
level := debug, dst := "tmp/topic_trace_x.log", name := <<"CLI-TOPIC-1">>},
#{type := topic, filter := <<"y/#">>,
name := <<"y/#">>, level := debug, dst := "tmp/topic_trace_y.log"}
name := <<"CLI-TOPIC-2">>, level := debug, dst := "tmp/topic_trace_y.log"}
],
emqx_trace_handler:running()),
@ -142,8 +142,8 @@ t_trace_topic(_Config) ->
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
ok = filesync("CLI-TOPIC-1", topic),
ok = filesync("CLI-TOPIC-2", topic),
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
@ -154,8 +154,8 @@ t_trace_topic(_Config) ->
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(topic, <<"x/#">>),
ok = emqx_trace_handler:uninstall(topic, <<"y/#">>),
ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-1">>),
ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-2">>),
{error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
@ -165,11 +165,12 @@ t_trace_ip_address(_Config) ->
emqtt:connect(T),
%% Start tracing
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
ok = emqx_trace_handler:install("CLI-IP-1", ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
ok = emqx_trace_handler:install("CLI-IP-2", ip_address,
"192.168.1.1", all, "tmp/ip_trace_y.log"),
emqx_trace:check(),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
ok = filesync(<<"CLI-IP-1">>, ip_address),
ok = filesync(<<"CLI-IP-2">>, ip_address),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
@ -177,10 +178,10 @@ t_trace_ip_address(_Config) ->
%% Get current traces
?assertMatch([#{type := ip_address, filter := "127.0.0.1",
name := <<"127.0.0.1">>,
name := <<"CLI-IP-1">>,
level := debug, dst := "tmp/ip_trace_x.log"},
#{type := ip_address, filter := "192.168.1.1",
name := <<"192.168.1.1">>,
name := <<"CLI-IP-2">>,
level := debug, dst := "tmp/ip_trace_y.log"}
],
emqx_trace_handler:running()),
@ -190,8 +191,8 @@ t_trace_ip_address(_Config) ->
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
ok = filesync(<<"CLI-IP-1">>, ip_address),
ok = filesync(<<"CLI-IP-2">>, ip_address),
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
@ -202,8 +203,8 @@ t_trace_ip_address(_Config) ->
?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-2">>),
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
@ -215,7 +216,12 @@ filesync(Name, Type) ->
%% sometime the handler process is not started yet.
filesync(_Name, _Type, 0) -> ok;
filesync(Name, Type, Retry) ->
filesync(Name0, Type, Retry) ->
Name =
case is_binary(Name0) of
true -> Name0;
false -> list_to_binary(Name0)
end,
try
Handler = binary_to_atom(<<"trace_",
(atom_to_binary(Type))/binary, "_", Name/binary>>),

View File

@ -142,11 +142,11 @@ fields(trace) ->
#{desc => """Filter type""",
nullable => false,
example => <<"clientid">>})},
{topic, hoconsc:mk(emqx_schema:unicode_binary(),
{topic, hoconsc:mk(binary(),
#{desc => """support mqtt wildcard topic.""",
nullable => true,
example => <<"/dev/#">>})},
{clientid, hoconsc:mk(emqx_schema:unicode_binary(),
{clientid, hoconsc:mk(binary(),
#{desc => """mqtt clientid.""",
nullable => true,
example => <<"dev-001">>})},

View File

@ -0,0 +1,50 @@
delayed {
enable = true
## 0 is no limit
max_delayed_messages = 0
}
observer_cli {
enable = true
}
telemetry {
enable = true
}
event_message {
"$event/client_connected" = true
"$event/client_disconnected" = true
# "$event/client_subscribed": false
# "$event/client_unsubscribed": false
# "$event/message_delivered": false
# "$event/message_acked": false
# "$event/message_dropped": false
}
topic_metrics: [
#{topic: "test/1"}
]
rewrite: [
# {
# action = publish
# source_topic = "x/#"
# re = "^x/y/(.+)$"
# dest_topic = "z/y/$1"
# },
# {
# action = subscribe
# source_topic = "x1/#"
# re = "^x1/y/(.+)$"
# dest_topic = "z1/y/$1"
# },
# {
# action = all
# source_topic = "x2/#"
# re = "^x2/y/(.+)$"
# dest_topic = "z2/y/$1"
# }
]