feat: add support ip_address trace options
This commit is contained in:
parent
4767b41eb7
commit
d76275d17d
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include("emqx_mgmt.hrl").
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_mgmt_cli]}}]).
|
||||
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
||||
|
||||
-define(PRINT_CMD(Cmd, Desc), io:format("~-48s# ~s~n", [Cmd, Desc])).
|
||||
|
||||
|
@ -38,6 +38,7 @@
|
|||
, vm/1
|
||||
, mnesia/1
|
||||
, trace/1
|
||||
, traces/1
|
||||
, log/1
|
||||
, mgmt/1
|
||||
, data/1
|
||||
|
@ -421,39 +422,43 @@ log(_) ->
|
|||
|
||||
trace(["list"]) ->
|
||||
lists:foreach(fun(Trace) ->
|
||||
#{type := Type, level := Level, dst := Dst} = Trace,
|
||||
Who = maps:get(Type, Trace),
|
||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Who, Level, Dst])
|
||||
end, emqx_tracer:lookup_traces());
|
||||
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
||||
end, emqx_trace_handler:running());
|
||||
|
||||
trace(["stop", "client", ClientId]) ->
|
||||
trace_off(clientid, ClientId);
|
||||
trace(["stop", Operation, ClientId]) ->
|
||||
case trace_type(Operation) of
|
||||
{ok, Type} -> trace_off(Type, ClientId);
|
||||
error -> trace([])
|
||||
end;
|
||||
|
||||
trace(["start", "client", ClientId, LogFile]) ->
|
||||
trace_on(clientid, ClientId, all, LogFile);
|
||||
trace(["start", Operation, ClientId, LogFile]) ->
|
||||
trace(["start", Operation, ClientId, LogFile, "all"]);
|
||||
|
||||
trace(["start", "client", ClientId, LogFile, Level]) ->
|
||||
trace_on(clientid, ClientId, list_to_atom(Level), LogFile);
|
||||
|
||||
trace(["stop", "topic", Topic]) ->
|
||||
trace_off(topic, Topic);
|
||||
|
||||
trace(["start", "topic", Topic, LogFile]) ->
|
||||
trace_on(topic, Topic, all, LogFile);
|
||||
|
||||
trace(["start", "topic", Topic, LogFile, Level]) ->
|
||||
trace_on(topic, Topic, list_to_atom(Level), LogFile);
|
||||
trace(["start", Operation, ClientId, LogFile, Level]) ->
|
||||
case trace_type(Operation) of
|
||||
{ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile);
|
||||
error -> trace([])
|
||||
end;
|
||||
|
||||
trace(_) ->
|
||||
emqx_ctl:usage([{"trace list", "List all traces started"},
|
||||
{"trace start client <ClientId> <File> [<Level>]", "Traces for a client"},
|
||||
{"trace stop client <ClientId>", "Stop tracing for a client"},
|
||||
{"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic"},
|
||||
{"trace stop topic <Topic> ", "Stop tracing for a topic"}]).
|
||||
emqx_ctl:usage([{"trace list", "List all traces started on local node"},
|
||||
{"trace start client <ClientId> <File> [<Level>]",
|
||||
"Traces for a client on local node"},
|
||||
{"trace stop client <ClientId>",
|
||||
"Stop tracing for a client on local node"},
|
||||
{"trace start topic <Topic> <File> [<Level>] ",
|
||||
"Traces for a topic on local node"},
|
||||
{"trace stop topic <Topic> ",
|
||||
"Stop tracing for a topic on local node"},
|
||||
{"trace start ip_address <IP> <File> [<Level>] ",
|
||||
"Traces for a client ip on local node"},
|
||||
{"trace stop ip_addresss <IP> ",
|
||||
"Stop tracing for a client ip on local node"}
|
||||
]).
|
||||
|
||||
-dialyzer({nowarn_function, [trace_on/4, trace_off/2]}).
|
||||
trace_on(Who, Name, Level, LogFile) ->
|
||||
case emqx_tracer:start_trace(Who, Name, Level, LogFile) of
|
||||
case emqx_trace_handler:install(Who, Name, Level, LogFile) of
|
||||
ok ->
|
||||
emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]);
|
||||
{error, Error} ->
|
||||
|
@ -461,13 +466,94 @@ trace_on(Who, Name, Level, LogFile) ->
|
|||
end.
|
||||
|
||||
trace_off(Who, Name) ->
|
||||
case emqx_tracer:stop_trace(Who, Name) of
|
||||
case emqx_trace_handler:uninstall(Who, Name) of
|
||||
ok ->
|
||||
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error])
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc Trace Cluster Command
|
||||
traces(["list"]) ->
|
||||
{ok, List} = emqx_trace_api:list_trace(get, []),
|
||||
case List of
|
||||
[] ->
|
||||
emqx_ctl:print("Cluster Trace is empty~n", []);
|
||||
_ ->
|
||||
lists:foreach(fun(Trace) ->
|
||||
#{type := Type, name := Name, status := Status,
|
||||
log_size := LogSize} = Trace,
|
||||
emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
||||
[Name, Type, maps:get(Type, Trace), Status, LogSize])
|
||||
end, List)
|
||||
end,
|
||||
length(List);
|
||||
|
||||
traces(["stop", Name]) ->
|
||||
trace_cluster_off(Name);
|
||||
|
||||
traces(["delete", Name]) ->
|
||||
trace_cluster_del(Name);
|
||||
|
||||
traces(["start", Name, Operation, Filter]) ->
|
||||
traces(["start", Name, Operation, Filter, "900"]);
|
||||
|
||||
traces(["start", Name, Operation, Filter, DurationS]) ->
|
||||
case trace_type(Operation) of
|
||||
{ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
||||
error -> traces([])
|
||||
end;
|
||||
|
||||
traces(_) ->
|
||||
emqx_ctl:usage([{"traces list", "List all cluster traces started"},
|
||||
{"traces start <Name> client <ClientId>", "Traces for a client in cluster"},
|
||||
{"traces start <Name> topic <Topic>", "Traces for a topic in cluster"},
|
||||
{"traces start <Name> ip_address <IPAddr>", "Traces for a IP in cluster"},
|
||||
{"traces stop <Name>", "Stop trace in cluster"},
|
||||
{"traces delete <Name>", "Delete trace in cluster"}
|
||||
]).
|
||||
|
||||
trace_cluster_on(Name, Type, Filter, DurationS0) ->
|
||||
case erlang:whereis(emqx_trace) of
|
||||
undefined ->
|
||||
emqx_ctl:print("[error] Tracer module not started~n"
|
||||
"Please run `emqx_ctl modules start tracer` "
|
||||
"or `emqx_ctl modules start emqx_mod_trace` first~n", []);
|
||||
_ ->
|
||||
DurationS = list_to_integer(DurationS0),
|
||||
Now = erlang:system_time(second),
|
||||
Trace = #{ name => list_to_binary(Name)
|
||||
, type => atom_to_binary(Type)
|
||||
, Type => list_to_binary(Filter)
|
||||
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Now))
|
||||
, end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
||||
},
|
||||
case emqx_trace:create(Trace) of
|
||||
ok ->
|
||||
emqx_ctl:print("Cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("[error] Cluster_trace ~s ~s=~s ~p~n",
|
||||
[Name, Type, Filter, Error])
|
||||
end
|
||||
end.
|
||||
|
||||
trace_cluster_del(Name) ->
|
||||
case emqx_trace:delete(list_to_binary(Name)) of
|
||||
ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]);
|
||||
{error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error])
|
||||
end.
|
||||
|
||||
trace_cluster_off(Name) ->
|
||||
case emqx_trace:update(list_to_binary(Name), false) of
|
||||
ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]);
|
||||
{error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
|
||||
end.
|
||||
|
||||
trace_type("client") -> {ok, clientid};
|
||||
trace_type("topic") -> {ok, topic};
|
||||
trace_type("ip_address") -> {ok, ip_address};
|
||||
trace_type(_) -> error.
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc Listeners Command
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ groups() ->
|
|||
t_vm_cmd,
|
||||
t_plugins_cmd,
|
||||
t_trace_cmd,
|
||||
t_traces_cmd,
|
||||
t_broker_cmd,
|
||||
t_router_cmd,
|
||||
t_subscriptions_cmd,
|
||||
|
@ -64,6 +65,23 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps(apps()).
|
||||
|
||||
init_per_testcase(t_plugins_cmd, Config) ->
|
||||
meck:new(emqx_plugins, [non_strict, passthrough]),
|
||||
meck:expect(emqx_plugins, load, fun(_) -> ok end),
|
||||
meck:expect(emqx_plugins, unload, fun(_) -> ok end),
|
||||
meck:expect(emqx_plugins, reload, fun(_) -> ok end),
|
||||
mock_print(),
|
||||
Config;
|
||||
init_per_testcase(_Case, Config) ->
|
||||
mock_print(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_plugins_cmd, _Config) ->
|
||||
meck:unload(emqx_plugins),
|
||||
unmock_print();
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
unmock_print().
|
||||
|
||||
t_app(_Config) ->
|
||||
{ok, AppSecret} = emqx_mgmt_auth:add_app(<<"app_id">>, <<"app_name">>),
|
||||
?assert(emqx_mgmt_auth:is_authorized(<<"app_id">>, AppSecret)),
|
||||
|
@ -96,7 +114,6 @@ t_app(_Config) ->
|
|||
ok.
|
||||
|
||||
t_log_cmd(_) ->
|
||||
mock_print(),
|
||||
lists:foreach(fun(Level) ->
|
||||
emqx_mgmt_cli:log(["primary-level", Level]),
|
||||
?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["primary-level"]))
|
||||
|
@ -109,12 +126,9 @@ t_log_cmd(_) ->
|
|||
?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["handlers", "set-level",
|
||||
atom_to_list(Id), Level]))
|
||||
end, ?LOG_LEVELS)
|
||||
|| #{id := Id} <- emqx_logger:get_log_handlers()],
|
||||
meck:unload().
|
||||
|| #{id := Id} <- emqx_logger:get_log_handlers()].
|
||||
|
||||
t_mgmt_cmd(_) ->
|
||||
% ct:pal("start testing the mgmt command"),
|
||||
mock_print(),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
|
||||
["lookup", "emqx_appid"]), "Not Found.")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
|
||||
|
@ -127,28 +141,19 @@ t_mgmt_cmd(_) ->
|
|||
["update", "emqx_appid", "ts"]), "update successfully")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
|
||||
["delete", "emqx_appid"]), "ok")),
|
||||
ok = emqx_mgmt_cli:mgmt(["list"]),
|
||||
meck:unload().
|
||||
ok = emqx_mgmt_cli:mgmt(["list"]).
|
||||
|
||||
t_status_cmd(_) ->
|
||||
% ct:pal("start testing status command"),
|
||||
mock_print(),
|
||||
%% init internal status seem to be always 'starting' when running ct tests
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")),
|
||||
meck:unload().
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")).
|
||||
|
||||
t_broker_cmd(_) ->
|
||||
% ct:pal("start testing the broker command"),
|
||||
mock_print(),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([]), "sysdescr")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["stats"]), "subscriptions.shared")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["metrics"]), "bytes.sent")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([undefined]), "broker")),
|
||||
meck:unload().
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([undefined]), "broker")).
|
||||
|
||||
t_clients_cmd(_) ->
|
||||
% ct:pal("start testing the client command"),
|
||||
mock_print(),
|
||||
process_flag(trap_exit, true),
|
||||
{ok, T} = emqtt:start_link([{clientid, <<"client12">>},
|
||||
{username, <<"testuser1">>},
|
||||
|
@ -164,7 +169,6 @@ t_clients_cmd(_) ->
|
|||
receive
|
||||
{'EXIT', T, _} ->
|
||||
ok
|
||||
% ct:pal("Connection closed: ~p~n", [Reason])
|
||||
after
|
||||
500 ->
|
||||
erlang:error("Client is not kick")
|
||||
|
@ -179,10 +183,11 @@ t_clients_cmd(_) ->
|
|||
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
|
||||
timer:sleep(300),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "client13")),
|
||||
meck:unload().
|
||||
|
||||
% emqx_mgmt_cli:clients(["kick", "client13"]),
|
||||
% timer:sleep(500),
|
||||
% ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "Not Found")).
|
||||
ok.
|
||||
|
||||
raw_recv_pase(Packet) ->
|
||||
emqx_frame:parse(Packet).
|
||||
|
@ -191,8 +196,6 @@ raw_send_serialize(Packet) ->
|
|||
emqx_frame:serialize(Packet).
|
||||
|
||||
t_vm_cmd(_) ->
|
||||
% ct:pal("start testing the vm command"),
|
||||
mock_print(),
|
||||
[[?assertMatch({match, _}, re:run(Result, Name))
|
||||
|| Result <- emqx_mgmt_cli:vm([Name])]
|
||||
|| Name <- ["load", "memory", "process", "io", "ports"]],
|
||||
|
@ -205,12 +208,9 @@ t_vm_cmd(_) ->
|
|||
[?assertMatch({match, _}, re:run(Result, "io"))
|
||||
|| Result <- emqx_mgmt_cli:vm(["io"])],
|
||||
[?assertMatch({match, _}, re:run(Result, "ports"))
|
||||
|| Result <- emqx_mgmt_cli:vm(["ports"])],
|
||||
unmock_print().
|
||||
|| Result <- emqx_mgmt_cli:vm(["ports"])].
|
||||
|
||||
t_trace_cmd(_) ->
|
||||
% ct:pal("start testing the trace command"),
|
||||
mock_print(),
|
||||
logger:set_primary_config(level, debug),
|
||||
{ok, T} = emqtt:start_link([{clientid, <<"client">>},
|
||||
{username, <<"testuser">>},
|
||||
|
@ -237,12 +237,34 @@ t_trace_cmd(_) ->
|
|||
Trace7 = emqx_mgmt_cli:trace(["start", "topic", "a/b/c",
|
||||
"log/clientid_trace.log", "error"]),
|
||||
?assertMatch({match, _}, re:run(Trace7, "successfully")),
|
||||
logger:set_primary_config(level, error),
|
||||
unmock_print().
|
||||
logger:set_primary_config(level, error).
|
||||
|
||||
t_traces_cmd(_) ->
|
||||
emqx_trace:mnesia(boot),
|
||||
Count1 = emqx_mgmt_cli:traces(["list"]),
|
||||
?assertEqual(0, Count1),
|
||||
Error1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]),
|
||||
?assertMatch({match, _}, re:run(Error1, "Tracer module not started")),
|
||||
emqx_trace:start_link(),
|
||||
Trace1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]),
|
||||
?assertMatch({match, _}, re:run(Trace1, "successfully")),
|
||||
Count2 = emqx_mgmt_cli:traces(["list"]),
|
||||
?assertEqual(1, Count2),
|
||||
Error2 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]),
|
||||
?assertMatch({match, _}, re:run(Error2, "already_existed")),
|
||||
Trace2 = emqx_mgmt_cli:traces(["stop", "test-name"]),
|
||||
?assertMatch({match, _}, re:run(Trace2, "successfully")),
|
||||
Count3 = emqx_mgmt_cli:traces(["list"]),
|
||||
?assertEqual(1, Count3),
|
||||
Trace3 = emqx_mgmt_cli:traces(["delete", "test-name"]),
|
||||
?assertMatch({match, _}, re:run(Trace3, "successfully")),
|
||||
Count4 = emqx_mgmt_cli:traces(["list"]),
|
||||
?assertEqual(0, Count4),
|
||||
Error3 = emqx_mgmt_cli:traces(["delete", "test-name"]),
|
||||
?assertMatch({match, _}, re:run(Error3, "not_found")),
|
||||
ok.
|
||||
|
||||
t_router_cmd(_) ->
|
||||
% ct:pal("start testing the router command"),
|
||||
mock_print(),
|
||||
{ok, T} = emqtt:start_link([{clientid, <<"client1">>},
|
||||
{username, <<"testuser1">>},
|
||||
{password, <<"pass1">>}
|
||||
|
@ -257,12 +279,9 @@ t_router_cmd(_) ->
|
|||
emqtt:connect(T1),
|
||||
emqtt:subscribe(T1, <<"a/b/c/d">>),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["list"]), "a/b/c | a/b/c")),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["show", "a/b/c"]), "a/b/c")),
|
||||
unmock_print().
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["show", "a/b/c"]), "a/b/c")).
|
||||
|
||||
t_subscriptions_cmd(_) ->
|
||||
% ct:pal("Start testing the subscriptions command"),
|
||||
mock_print(),
|
||||
{ok, T3} = emqtt:start_link([{clientid, <<"client">>},
|
||||
{username, <<"testuser">>},
|
||||
{password, <<"pass">>}
|
||||
|
@ -273,22 +292,18 @@ t_subscriptions_cmd(_) ->
|
|||
[?assertMatch({match, _} , re:run(Result, "b/b/c"))
|
||||
|| Result <- emqx_mgmt_cli:subscriptions(["show", <<"client">>])],
|
||||
?assertEqual(emqx_mgmt_cli:subscriptions(["add", "client", "b/b/c", "0"]), "ok~n"),
|
||||
?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"),
|
||||
unmock_print().
|
||||
?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n").
|
||||
|
||||
t_listeners_cmd_old(_) ->
|
||||
ok = emqx_listeners:ensure_all_started(),
|
||||
mock_print(),
|
||||
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
|
||||
?assertEqual(
|
||||
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
|
||||
emqx_mgmt_cli:listeners(["stop", "wss", "8084"])
|
||||
),
|
||||
unmock_print().
|
||||
).
|
||||
|
||||
t_listeners_cmd_new(_) ->
|
||||
ok = emqx_listeners:ensure_all_started(),
|
||||
mock_print(),
|
||||
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
|
||||
?assertEqual(
|
||||
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
|
||||
|
@ -304,16 +319,11 @@ t_listeners_cmd_new(_) ->
|
|||
),
|
||||
?assertEqual(
|
||||
emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]),
|
||||
"Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n"
|
||||
),
|
||||
unmock_print().
|
||||
"Failed to restart bad:listener:identifier listener: "
|
||||
"{no_such_listener,\"bad:listener:identifier\"}\n"
|
||||
).
|
||||
|
||||
t_plugins_cmd(_) ->
|
||||
mock_print(),
|
||||
meck:new(emqx_plugins, [non_strict, passthrough]),
|
||||
meck:expect(emqx_plugins, load, fun(_) -> ok end),
|
||||
meck:expect(emqx_plugins, unload, fun(_) -> ok end),
|
||||
meck:expect(emqx_plugins, reload, fun(_) -> ok end),
|
||||
?assertEqual(emqx_mgmt_cli:plugins(["list"]), ok),
|
||||
?assertEqual(
|
||||
emqx_mgmt_cli:plugins(["unload", "emqx_auth_mnesia"]),
|
||||
|
@ -326,11 +336,9 @@ t_plugins_cmd(_) ->
|
|||
?assertEqual(
|
||||
emqx_mgmt_cli:plugins(["unload", "emqx_management"]),
|
||||
"Plugin emqx_management can not be unloaded.~n"
|
||||
),
|
||||
unmock_print().
|
||||
).
|
||||
|
||||
t_cli(_) ->
|
||||
mock_print(),
|
||||
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([""]), "status")),
|
||||
[?assertMatch({match, _}, re:run(Value, "broker"))
|
||||
|| Value <- emqx_mgmt_cli:broker([""])],
|
||||
|
@ -352,9 +360,10 @@ t_cli(_) ->
|
|||
|| Value <- emqx_mgmt_cli:mnesia([""])],
|
||||
[?assertMatch({match, _}, re:run(Value, "trace"))
|
||||
|| Value <- emqx_mgmt_cli:trace([""])],
|
||||
[?assertMatch({match, _}, re:run(Value, "traces"))
|
||||
|| Value <- emqx_mgmt_cli:traces([""])],
|
||||
[?assertMatch({match, _}, re:run(Value, "mgmt"))
|
||||
|| Value <- emqx_mgmt_cli:mgmt([""])],
|
||||
unmock_print().
|
||||
|| Value <- emqx_mgmt_cli:mgmt([""])].
|
||||
|
||||
mock_print() ->
|
||||
catch meck:unload(emqx_ctl),
|
||||
|
|
|
@ -18,10 +18,9 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-logger_header("[Trace]").
|
||||
-logger_header("[Tracer]").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
@ -29,6 +28,11 @@
|
|||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
-export([ publish/1
|
||||
, subscribe/3
|
||||
, unsubscribe/2
|
||||
]).
|
||||
|
||||
-export([ start_link/0
|
||||
, list/0
|
||||
, list/1
|
||||
|
@ -41,6 +45,7 @@
|
|||
|
||||
-export([ format/1
|
||||
, zip_dir/0
|
||||
, filename/2
|
||||
, trace_dir/0
|
||||
, trace_file/1
|
||||
, delete_files_after_send/2
|
||||
|
@ -49,23 +54,22 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(TRACE, ?MODULE).
|
||||
-define(PACKETS, tuple_to_list(?TYPE_NAMES)).
|
||||
-define(MAX_SIZE, 30).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([log_file/2]).
|
||||
-endif.
|
||||
|
||||
-export_type([ip_address/0]).
|
||||
-type ip_address() :: string().
|
||||
|
||||
-record(?TRACE,
|
||||
{ name :: binary() | undefined | '_'
|
||||
, type :: clientid | topic | undefined | '_'
|
||||
, topic :: emqx_types:topic() | undefined | '_'
|
||||
, clientid :: emqx_types:clientid() | undefined | '_'
|
||||
, packets = [] :: list() | '_'
|
||||
, type :: clientid | topic | ip_address | undefined | '_'
|
||||
, filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
|
||||
, enable = true :: boolean() | '_'
|
||||
, start_at :: integer() | undefined | binary() | '_'
|
||||
, end_at :: integer() | undefined | binary() | '_'
|
||||
, log_size = #{} :: map() | '_'
|
||||
, start_at :: integer() | undefined | '_'
|
||||
, end_at :: integer() | undefined | '_'
|
||||
}).
|
||||
|
||||
mnesia(boot) ->
|
||||
|
@ -77,6 +81,31 @@ mnesia(boot) ->
|
|||
mnesia(copy) ->
|
||||
ok = ekka_mnesia:copy_table(?TRACE, disc_copies).
|
||||
|
||||
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
||||
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
||||
is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info(
|
||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||
"PUBLISH to ~s: ~0p",
|
||||
[Topic, Payload]
|
||||
).
|
||||
|
||||
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
||||
subscribe(Topic, SubId, SubOpts) ->
|
||||
emqx_logger:info(
|
||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||
"~ts SUBSCRIBE ~ts: Options: ~0p",
|
||||
[SubId, Topic, SubOpts]
|
||||
).
|
||||
|
||||
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
||||
unsubscribe(Topic, SubOpts) ->
|
||||
emqx_logger:info(
|
||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||
"~ts UNSUBSCRIBE ~ts: Options: ~0p",
|
||||
[maps:get(subid, SubOpts, ""), Topic, SubOpts]
|
||||
).
|
||||
|
||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
@ -89,18 +118,18 @@ list() ->
|
|||
list(Enable) ->
|
||||
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
|
||||
|
||||
-spec create([{Key :: binary(), Value :: binary()}]) ->
|
||||
-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
|
||||
ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
|
||||
create(Trace) ->
|
||||
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
||||
true ->
|
||||
case to_trace(Trace) of
|
||||
{ok, TraceRec} -> create_new_trace(TraceRec);
|
||||
{ok, TraceRec} -> insert_new_trace(TraceRec);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
false ->
|
||||
{error, """The number of traces created has reached the maximum,
|
||||
please delete the useless ones first"""}
|
||||
{error, "The number of traces created has reached the maximum"
|
||||
" please delete the useless ones first"}
|
||||
end.
|
||||
|
||||
-spec delete(Name :: binary()) -> ok | {error, not_found}.
|
||||
|
@ -163,12 +192,8 @@ delete_files_after_send(TraceLog, Zips) ->
|
|||
-spec format(list(#?TRACE{})) -> list(map()).
|
||||
format(Traces) ->
|
||||
Fields = record_info(fields, ?TRACE),
|
||||
lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) ->
|
||||
Trace = Trace0#?TRACE{
|
||||
start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)),
|
||||
end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt))
|
||||
},
|
||||
[_ | Values] = tuple_to_list(Trace),
|
||||
lists:map(fun(Trace0 = #?TRACE{}) ->
|
||||
[_ | Values] = tuple_to_list(Trace0),
|
||||
maps:from_list(lists:zip(Fields, Values))
|
||||
end, Traces).
|
||||
|
||||
|
@ -198,7 +223,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
|
|||
case maps:take(Pid, Monitors) of
|
||||
error -> {noreply, State};
|
||||
{Files, NewMonitors} ->
|
||||
lists:foreach(fun(F) -> file:delete(F) end, Files),
|
||||
lists:foreach(fun file:delete/1, Files),
|
||||
{noreply, State#{monitors => NewMonitors}}
|
||||
end;
|
||||
handle_info({timeout, TRef, update_trace},
|
||||
|
@ -227,14 +252,12 @@ terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
|
|||
code_change(_, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
create_new_trace(Trace) ->
|
||||
insert_new_trace(Trace) ->
|
||||
Tran = fun() ->
|
||||
case mnesia:read(?TRACE, Trace#?TRACE.name) of
|
||||
[] ->
|
||||
#?TRACE{start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets} = Trace,
|
||||
Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic,
|
||||
clientid = ClientId, packets = Packets},
|
||||
#?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
|
||||
Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
|
||||
case mnesia:match_object(?TRACE, Match, read) of
|
||||
[] -> mnesia:write(?TRACE, Trace, write);
|
||||
[#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
|
||||
|
@ -248,7 +271,7 @@ update_trace(Traces) ->
|
|||
Now = erlang:system_time(second),
|
||||
{_Waiting, Running, Finished} = classify_by_time(Traces, Now),
|
||||
disable_finished(Finished),
|
||||
Started = already_running(),
|
||||
Started = emqx_trace_handler:running(),
|
||||
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
||||
NeedStop = AllStarted -- NeedRunning,
|
||||
ok = stop_trace(NeedStop, Started),
|
||||
|
@ -257,14 +280,8 @@ update_trace(Traces) ->
|
|||
emqx_misc:start_timer(NextTime, update_trace).
|
||||
|
||||
stop_all_trace_handler() ->
|
||||
lists:foreach(fun(#{type := Type, name := Name} = Trace) ->
|
||||
_ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name)
|
||||
end
|
||||
, already_running()).
|
||||
|
||||
already_running() ->
|
||||
emqx_tracer:lookup_traces().
|
||||
|
||||
lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
|
||||
emqx_trace_handler:running()).
|
||||
get_enable_trace() ->
|
||||
{atomic, Traces} =
|
||||
mnesia:transaction(fun() ->
|
||||
|
@ -286,31 +303,24 @@ find_closest_time(Traces, Now) ->
|
|||
|
||||
disable_finished([]) -> ok;
|
||||
disable_finished(Traces) ->
|
||||
NameWithLogSize =
|
||||
lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) ->
|
||||
FileSize = filelib:file_size(log_file(Name, StartAt)),
|
||||
{Name, FileSize} end, Traces),
|
||||
transaction(fun() ->
|
||||
lists:map(fun({Name, LogSize}) ->
|
||||
lists:map(fun(#?TRACE{name = Name}) ->
|
||||
case mnesia:read(?TRACE, Name, write) of
|
||||
[] -> ok;
|
||||
[Trace = #?TRACE{log_size = Logs}] ->
|
||||
mnesia:write(?TRACE, Trace#?TRACE{enable = false,
|
||||
log_size = Logs#{node() => LogSize}}, write)
|
||||
end end, NameWithLogSize)
|
||||
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
|
||||
end end, Traces)
|
||||
end).
|
||||
|
||||
start_trace(Traces, Started0) ->
|
||||
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
|
||||
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
|
||||
case lists:member(Name, StartedAcc) of
|
||||
true -> {[Name | Running], StartedAcc};
|
||||
true ->
|
||||
{[Name | Running], StartedAcc};
|
||||
false ->
|
||||
case start_trace(Trace) of
|
||||
ok -> {[Name | Running], [Name | StartedAcc]};
|
||||
Error ->
|
||||
?LOG(error, "(~p)start trace failed by:~p", [Name, Error]),
|
||||
{[Name | Running], StartedAcc}
|
||||
{error, _Reason} -> {[Name | Running], StartedAcc}
|
||||
end
|
||||
end
|
||||
end, {[], Started}, Traces).
|
||||
|
@ -318,27 +328,16 @@ start_trace(Traces, Started0) ->
|
|||
start_trace(Trace) ->
|
||||
#?TRACE{name = Name
|
||||
, type = Type
|
||||
, clientid = ClientId
|
||||
, topic = Topic
|
||||
, packets = Packets
|
||||
, filter = Filter
|
||||
, start_at = Start
|
||||
} = Trace,
|
||||
Who0 = #{name => Name, labels => Packets},
|
||||
Who =
|
||||
case Type of
|
||||
topic -> Who0#{type => topic, topic => Topic};
|
||||
clientid -> Who0#{type => clientid, clientid => ClientId}
|
||||
end,
|
||||
case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of
|
||||
ok -> ok;
|
||||
{error, {already_exist, _}} -> ok;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
Who = #{name => Name, type => Type, filter => Filter},
|
||||
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
|
||||
|
||||
stop_trace(Finished, Started) ->
|
||||
lists:foreach(fun(#{name := Name, type := Type} = Trace) ->
|
||||
lists:foreach(fun(#{name := Name, type := Type}) ->
|
||||
case lists:member(Name, Finished) of
|
||||
true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name);
|
||||
true -> emqx_trace_handler:uninstall(Type, Name);
|
||||
false -> ok
|
||||
end
|
||||
end, Started).
|
||||
|
@ -371,23 +370,31 @@ classify_by_time([Trace = #?TRACE{end_at = End} | Traces],
|
|||
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
|
||||
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
||||
|
||||
to_trace(TraceList) ->
|
||||
case to_trace(TraceList, #?TRACE{}) of
|
||||
to_trace(TraceParam) ->
|
||||
case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
{ok, #?TRACE{name = undefined}} ->
|
||||
{error, "name required"};
|
||||
{ok, #?TRACE{type = undefined}} ->
|
||||
{error, "type required"};
|
||||
{ok, #?TRACE{topic = undefined, clientid = undefined}} ->
|
||||
{error, "topic/clientid cannot be both empty"};
|
||||
{ok, Trace} ->
|
||||
case fill_default(Trace) of
|
||||
{error, "type=[topic,clientid,ip_address] required"};
|
||||
{ok, #?TRACE{filter = undefined}} ->
|
||||
{error, "topic/clientid/ip_address filter required"};
|
||||
{ok, TraceRec0} ->
|
||||
case fill_default(TraceRec0) of
|
||||
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
||||
{error, "failed by start_at >= end_at"};
|
||||
Trace1 -> {ok, Trace1}
|
||||
TraceRec -> {ok, TraceRec}
|
||||
end
|
||||
end.
|
||||
|
||||
ensure_proplists(#{} = Trace) -> maps:to_list(Trace);
|
||||
ensure_proplists(Trace) when is_list(Trace) ->
|
||||
lists:foldl(
|
||||
fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc];
|
||||
({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc];
|
||||
(_, Acc) -> Acc
|
||||
end, [], Trace).
|
||||
|
||||
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
||||
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
||||
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
||||
|
@ -395,27 +402,35 @@ fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
|||
fill_default(Trace) -> Trace.
|
||||
|
||||
to_trace([], Rec) -> {ok, Rec};
|
||||
to_trace([{<<"name">>, Name} | Trace], Rec) ->
|
||||
to_trace([{name, Name} | Trace], Rec) ->
|
||||
case binary:match(Name, [<<"/">>], []) of
|
||||
nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
|
||||
nomatch when byte_size(Name) < 200 -> to_trace(Trace, Rec#?TRACE{name = Name});
|
||||
nomatch -> {error, "name(latin1) length must < 200"};
|
||||
_ -> {error, "name cannot contain /"}
|
||||
end;
|
||||
to_trace([{<<"type">>, Type} | Trace], Rec) ->
|
||||
case lists:member(Type, [<<"clientid">>, <<"topic">>]) of
|
||||
to_trace([{type, Type} | Trace], Rec) ->
|
||||
case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of
|
||||
true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
|
||||
false -> {error, "incorrect type: only support clientid/topic"}
|
||||
false -> {error, "incorrect type: only support clientid/topic/ip_address"}
|
||||
end;
|
||||
to_trace([{<<"topic">>, Topic} | Trace], Rec) ->
|
||||
to_trace([{topic, Topic} | Trace], Rec) ->
|
||||
case validate_topic(Topic) of
|
||||
ok -> to_trace(Trace, Rec#?TRACE{topic = Topic});
|
||||
ok -> to_trace(Trace, Rec#?TRACE{filter = Topic});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"start_at">>, StartAt} | Trace], Rec) ->
|
||||
to_trace([{clientid, ClientId} | Trace], Rec) ->
|
||||
to_trace(Trace, Rec#?TRACE{filter = ClientId});
|
||||
to_trace([{ip_address, IP} | Trace], Rec) ->
|
||||
case inet:parse_address(binary_to_list(IP)) of
|
||||
{ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)});
|
||||
{error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
|
||||
end;
|
||||
to_trace([{start_at, StartAt} | Trace], Rec) ->
|
||||
case to_system_second(StartAt) of
|
||||
{ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
to_trace([{<<"end_at">>, EndAt} | Trace], Rec) ->
|
||||
to_trace([{end_at, EndAt} | Trace], Rec) ->
|
||||
Now = erlang:system_time(second),
|
||||
case to_system_second(EndAt) of
|
||||
{ok, Sec} when Sec > Now ->
|
||||
|
@ -425,21 +440,14 @@ to_trace([{<<"end_at">>, EndAt} | Trace], Rec) ->
|
|||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
to_trace([{<<"clientid">>, ClientId} | Trace], Rec) ->
|
||||
to_trace(Trace, Rec#?TRACE{clientid = ClientId});
|
||||
to_trace([{<<"packets">>, PacketList} | Trace], Rec) ->
|
||||
case to_packets(PacketList) of
|
||||
{ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets});
|
||||
{error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])}
|
||||
end;
|
||||
to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
|
||||
|
||||
validate_topic(TopicName) ->
|
||||
try emqx_topic:validate(name, TopicName) of
|
||||
try emqx_topic:validate(filter, TopicName) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error ->
|
||||
{error, io_lib:format("~s invalid by ~p", [TopicName, Error])}
|
||||
{error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
|
||||
end.
|
||||
|
||||
to_system_second(At) ->
|
||||
|
@ -450,14 +458,6 @@ to_system_second(At) ->
|
|||
{error, ["The rfc3339 specification not satisfied: ", At]}
|
||||
end.
|
||||
|
||||
to_packets(Packets) when is_list(Packets) ->
|
||||
AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets),
|
||||
case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of
|
||||
[] -> {ok, AtomTypes};
|
||||
InvalidE -> {error, InvalidE}
|
||||
end;
|
||||
to_packets(Packets) -> {error, Packets}.
|
||||
|
||||
zip_dir() ->
|
||||
trace_dir() ++ "zip/".
|
||||
|
||||
|
|
|
@ -27,18 +27,37 @@
|
|||
, download_zip_log/2
|
||||
, stream_log_file/2
|
||||
]).
|
||||
-export([read_trace_file/3]).
|
||||
-export([ read_trace_file/3
|
||||
, get_trace_size/0
|
||||
]).
|
||||
|
||||
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
||||
-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])}).
|
||||
-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}).
|
||||
|
||||
list_trace(_, Params) ->
|
||||
List =
|
||||
case Params of
|
||||
[{<<"enable">>, Enable}] -> emqx_trace:list(binary_to_existing_atom(Enable));
|
||||
_ -> emqx_trace:list()
|
||||
end,
|
||||
{ok, emqx_trace:format(List)}.
|
||||
list_trace(_, _Params) ->
|
||||
case emqx_trace:list() of
|
||||
[] -> {ok, []};
|
||||
List0 ->
|
||||
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0),
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
|
||||
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
|
||||
Now = erlang:system_time(second),
|
||||
Traces =
|
||||
lists:map(fun(Trace = #{name := Name, start_at := Start,
|
||||
end_at := End, enable := Enable, type := Type, filter := Filter}) ->
|
||||
FileName = emqx_trace:filename(Name, Start),
|
||||
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
|
||||
Trace0 = maps:without([enable, filter], Trace),
|
||||
Trace0#{ log_size => LogSize
|
||||
, Type => Filter
|
||||
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
|
||||
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
|
||||
, status => status(Enable, Start, End, Now)
|
||||
}
|
||||
end, emqx_trace:format(List)),
|
||||
{ok, Traces}
|
||||
end.
|
||||
|
||||
create_trace(_, Param) ->
|
||||
case emqx_trace:create(Param) of
|
||||
|
@ -97,28 +116,48 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
|||
end, [], TraceFiles).
|
||||
|
||||
collect_trace_file(TraceLog) ->
|
||||
cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
|
||||
|
||||
cluster_call(Mod, Fun, Args, Timeout) ->
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
{Files, BadNodes} = rpc:multicall(Nodes, emqx_trace, trace_file, [TraceLog], 60000),
|
||||
BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]),
|
||||
Files.
|
||||
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
||||
BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
|
||||
GoodRes.
|
||||
|
||||
stream_log_file(#{name := Name}, Params) ->
|
||||
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
|
||||
Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
|
||||
Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
|
||||
Node = binary_to_existing_atom(Node0),
|
||||
Position = binary_to_integer(Position0),
|
||||
Bytes = binary_to_integer(Bytes0),
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
{ok, Bin} ->
|
||||
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => Bin}};
|
||||
{eof, Size} ->
|
||||
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => <<"">>}};
|
||||
{error, Reason} ->
|
||||
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
|
||||
{error, Reason}
|
||||
case to_node(Node0) of
|
||||
{ok, Node} ->
|
||||
Position = binary_to_integer(Position0),
|
||||
Bytes = binary_to_integer(Bytes0),
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
{ok, Bin} ->
|
||||
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => Bin}};
|
||||
{eof, Size} ->
|
||||
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => <<"">>}};
|
||||
{error, Reason} ->
|
||||
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
|
||||
{error, Reason};
|
||||
{badrpc, nodedown} ->
|
||||
{error, "BadRpc node down"}
|
||||
end;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
get_trace_size() ->
|
||||
TraceDir = emqx_trace:trace_dir(),
|
||||
Node = node(),
|
||||
case file:list_dir(TraceDir) of
|
||||
{ok, AllFiles} ->
|
||||
lists:foldl(fun(File, Acc) ->
|
||||
FullFileName = filename:join(TraceDir, File),
|
||||
Acc#{{Node, File} => filelib:file_size(FullFileName)}
|
||||
end, #{}, lists:delete("zip", AllFiles));
|
||||
_ -> #{}
|
||||
end.
|
||||
|
||||
%% this is an rpc call for stream_log_file/2
|
||||
|
@ -134,7 +173,6 @@ read_trace_file(Name, Position, Limit) ->
|
|||
[] -> {error, not_found}
|
||||
end.
|
||||
|
||||
-dialyzer({nowarn_function, read_file/3}).
|
||||
read_file(Path, Offset, Bytes) ->
|
||||
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
|
||||
try
|
||||
|
@ -146,9 +184,27 @@ read_file(Path, Offset, Bytes) ->
|
|||
{ok, Bin} -> {ok, Bin};
|
||||
{error, Reason} -> {error, Reason};
|
||||
eof ->
|
||||
#file_info{size = Size} = file:read_file_info(IoDevice),
|
||||
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
||||
{eof, Size}
|
||||
end
|
||||
after
|
||||
file:close(IoDevice)
|
||||
end.
|
||||
|
||||
to_node(Node) ->
|
||||
try {ok, binary_to_existing_atom(Node)}
|
||||
catch _:_ ->
|
||||
{error, "node not found"}
|
||||
end.
|
||||
|
||||
collect_file_size(Nodes, FileName, AllFiles) ->
|
||||
lists:foldl(fun(Node, Acc) ->
|
||||
Size = maps:get({Node, FileName}, AllFiles, 0),
|
||||
Acc#{Node => Size}
|
||||
end, #{}, Nodes).
|
||||
|
||||
%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>;
|
||||
status(false, _Start, _End, _Now) -> <<"stopped">>;
|
||||
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
|
||||
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
|
||||
status(true, _Start, _End, _Now) -> <<"running">>.
|
||||
|
|
|
@ -23,21 +23,7 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
-record(emqx_trace, {
|
||||
name,
|
||||
type,
|
||||
topic,
|
||||
clientid,
|
||||
packets = [],
|
||||
enable = true,
|
||||
start_at,
|
||||
end_at,
|
||||
log_size = #{}
|
||||
}).
|
||||
|
||||
-define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL'
|
||||
, 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK'
|
||||
, 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']).
|
||||
-record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
|
@ -61,16 +47,14 @@ t_base_create_delete(_Config) ->
|
|||
End = to_rfc3339(Now + 30 * 60),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"start_at">>, Start},
|
||||
{<<"end_at">>, End}
|
||||
],
|
||||
AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}),
|
||||
Trace = #{
|
||||
name => Name,
|
||||
type => <<"clientid">>,
|
||||
clientid => ClientId,
|
||||
start_at => Start,
|
||||
end_at => End
|
||||
},
|
||||
AnotherTrace = Trace#{name => <<"anotherTrace">>},
|
||||
ok = emqx_trace:create(Trace),
|
||||
?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)),
|
||||
?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)),
|
||||
|
@ -78,24 +62,19 @@ t_base_create_delete(_Config) ->
|
|||
Expect = #emqx_trace{
|
||||
name = Name,
|
||||
type = clientid,
|
||||
topic = undefined,
|
||||
clientid = ClientId,
|
||||
packets = ?PACKETS,
|
||||
filter = ClientId,
|
||||
start_at = Now,
|
||||
end_at = Now + 30 * 60
|
||||
},
|
||||
?assertEqual(Expect, TraceRec),
|
||||
ExpectFormat = [
|
||||
#{
|
||||
clientid => <<"test-device">>,
|
||||
filter => <<"test-device">>,
|
||||
enable => true,
|
||||
type => clientid,
|
||||
packets => ?PACKETS,
|
||||
name => <<"name1">>,
|
||||
start_at => Start,
|
||||
end_at => End,
|
||||
log_size => #{},
|
||||
topic => undefined
|
||||
start_at => Now,
|
||||
end_at => Now + 30 * 60
|
||||
}
|
||||
],
|
||||
?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])),
|
||||
|
@ -108,13 +87,12 @@ t_create_size_max(_Config) ->
|
|||
emqx_trace:clear(),
|
||||
lists:map(fun(Seq) ->
|
||||
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||
Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||
Trace = [{name, Name}, {type, <<"topic">>},
|
||||
{topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||
ok = emqx_trace:create(Trace)
|
||||
end, lists:seq(1, 30)),
|
||||
Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}],
|
||||
Trace31 = [{<<"name">>, <<"name31">>},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}],
|
||||
{error, _} = emqx_trace:create(Trace31),
|
||||
ok = emqx_trace:delete(<<"name30">>),
|
||||
ok = emqx_trace:create(Trace31),
|
||||
|
@ -125,11 +103,11 @@ t_create_failed(_Config) ->
|
|||
ok = emqx_trace:clear(),
|
||||
UnknownField = [{<<"unknown">>, 12}],
|
||||
{error, Reason1} = emqx_trace:create(UnknownField),
|
||||
?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)),
|
||||
?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
|
||||
|
||||
InvalidTopic = [{<<"topic">>, "#/#//"}],
|
||||
{error, Reason2} = emqx_trace:create(InvalidTopic),
|
||||
?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||
?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||
|
||||
InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
|
||||
{error, Reason3} = emqx_trace:create(InvalidStart),
|
||||
|
@ -141,41 +119,34 @@ t_create_failed(_Config) ->
|
|||
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||
iolist_to_binary(Reason4)),
|
||||
|
||||
InvalidPackets = [{<<"packets">>, [<<"publish">>]}],
|
||||
{error, Reason5} = emqx_trace:create(InvalidPackets),
|
||||
?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)),
|
||||
|
||||
InvalidPackets2 = [{<<"packets">>, <<"publish">>}],
|
||||
{error, Reason6} = emqx_trace:create(InvalidPackets2),
|
||||
?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)),
|
||||
|
||||
{error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
|
||||
?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)),
|
||||
?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)),
|
||||
|
||||
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
|
||||
{<<"type">>, <<"clientid">>}],
|
||||
{error, Reason9} = emqx_trace:create(InvalidPackets4),
|
||||
?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
|
||||
|
||||
?assertEqual({error, "type required"}, emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}])),
|
||||
?assertEqual({error, "type=[topic,clientid,ip_address] required"},
|
||||
emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
|
||||
|
||||
?assertEqual({error, "incorrect type: only support clientid/topic"},
|
||||
?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"},
|
||||
emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
||||
{<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
||||
|
||||
?assertEqual({error, "ip address: einval"},
|
||||
emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])),
|
||||
ok.
|
||||
|
||||
t_create_default(_Config) ->
|
||||
ok = emqx_trace:clear(),
|
||||
{error, "name required"} = emqx_trace:create([]),
|
||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]),
|
||||
[#emqx_trace{packets = Packets}] = emqx_trace:list(),
|
||||
?assertEqual([], Packets),
|
||||
{<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
|
||||
[#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
|
||||
ok = emqx_trace:clear(),
|
||||
Trace = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
||||
|
@ -186,15 +157,13 @@ t_create_default(_Config) ->
|
|||
Trace2 = [
|
||||
{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>},
|
||||
{<<"start_at">>, to_rfc3339(Now + 10)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 3)}
|
||||
],
|
||||
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
|
||||
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||
[#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
|
||||
?assertEqual(10 * 60, End - Start),
|
||||
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
||||
|
@ -205,8 +174,8 @@ t_update_enable(_Config) ->
|
|||
Name = <<"test-name">>,
|
||||
Now = erlang:system_time(second),
|
||||
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||
ok = emqx_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||
ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||
[#emqx_trace{enable = Enable}] = emqx_trace:list(),
|
||||
?assertEqual(Enable, true),
|
||||
ok = emqx_trace:update(Name, false),
|
||||
|
@ -293,13 +262,10 @@ t_get_log_filename(_Config) ->
|
|||
Start = calendar:system_time_to_rfc3339(Now),
|
||||
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||
Name = <<"name1">>,
|
||||
ClientId = <<"test-device">>,
|
||||
Packets = [atom_to_binary(E) || E <- ?PACKETS],
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"clientid">>},
|
||||
{<<"clientid">>, ClientId},
|
||||
{<<"packets">>, Packets},
|
||||
{<<"type">>, <<"ip_address">>},
|
||||
{<<"ip_address">>, <<"127.0.0.1">>},
|
||||
{<<"start_at">>, list_to_binary(Start)},
|
||||
{<<"end_at">>, list_to_binary(End)}
|
||||
],
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
%% API
|
||||
-export([ list_trace/2
|
||||
, create_trace/2
|
||||
, update_trace/2
|
||||
, disable_trace/2
|
||||
, delete_trace/2
|
||||
, clear_traces/2
|
||||
, download_zip_log/2
|
||||
|
@ -52,11 +52,11 @@
|
|||
func => clear_traces,
|
||||
descr => "clear all traces"}).
|
||||
|
||||
-rest_api(#{name => update_trace,
|
||||
-rest_api(#{name => disable_trace,
|
||||
method => 'PUT',
|
||||
path => "/trace/:bin:name/:atom:operation",
|
||||
func => update_trace,
|
||||
descr => "diable/enable trace"}).
|
||||
path => "/trace/:bin:name/stop",
|
||||
func => disable_trace,
|
||||
descr => "stop trace"}).
|
||||
|
||||
-rest_api(#{name => download_zip_log,
|
||||
method => 'GET',
|
||||
|
@ -82,8 +82,8 @@ delete_trace(Path, Params) ->
|
|||
clear_traces(Path, Params) ->
|
||||
return(emqx_trace_api:clear_traces(Path, Params)).
|
||||
|
||||
update_trace(Path, Params) ->
|
||||
return(emqx_trace_api:update_trace(Path, Params)).
|
||||
disable_trace(#{name := Name}, Params) ->
|
||||
return(emqx_trace_api:update_trace(#{name => Name, operation => disable}, Params)).
|
||||
|
||||
download_zip_log(Path, Params) ->
|
||||
case emqx_trace_api:download_zip_log(Path, Params) of
|
||||
|
|
|
@ -52,14 +52,13 @@ t_http_test(_Config) ->
|
|||
%% create
|
||||
ErrorTrace = #{},
|
||||
{ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace),
|
||||
?assertEqual(#{<<"message">> => <<"unknown field: {}">>,
|
||||
?assertEqual(#{<<"message">> => <<"name required">>,
|
||||
<<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)),
|
||||
|
||||
Name = <<"test-name">>,
|
||||
Trace = [
|
||||
{<<"name">>, Name},
|
||||
{<<"type">>, <<"topic">>},
|
||||
{<<"packets">>, [<<"PUBLISH">>]},
|
||||
{<<"topic">>, <<"/x/y/z">>}
|
||||
],
|
||||
|
||||
|
@ -71,14 +70,23 @@ t_http_test(_Config) ->
|
|||
?assertEqual(Name, maps:get(<<"name">>, Data)),
|
||||
|
||||
%% update
|
||||
{ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}),
|
||||
{ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}),
|
||||
?assertEqual(#{<<"code">> => 0,
|
||||
<<"data">> => #{<<"enable">> => false,
|
||||
<<"name">> => <<"test-name">>}}, json(Update)),
|
||||
|
||||
{ok, List1} = request_api(get, api_path("trace"), Header),
|
||||
#{<<"code">> := 0, <<"data">> := [Data1]} = json(List1),
|
||||
?assertEqual(false, maps:get(<<"enable">>, Data1)),
|
||||
Node = atom_to_binary(node()),
|
||||
?assertMatch(#{
|
||||
<<"status">> := <<"stopped">>,
|
||||
<<"name">> := <<"test-name">>,
|
||||
<<"log_size">> := #{Node := _},
|
||||
<<"start_at">> := _,
|
||||
<<"end_at">> := _,
|
||||
<<"type">> := <<"topic">>,
|
||||
<<"topic">> := <<"/x/y/z">>
|
||||
}, Data1),
|
||||
|
||||
%% delete
|
||||
{ok, Delete} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
|
@ -86,7 +94,7 @@ t_http_test(_Config) ->
|
|||
|
||||
{ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header),
|
||||
?assertEqual(#{<<"code">> => <<"NOT_FOUND">>,
|
||||
<<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)),
|
||||
<<"message">> => <<"test-name NOT FOUND">>}, json(DeleteNotFound)),
|
||||
|
||||
{ok, List2} = request_api(get, api_path("trace"), Header),
|
||||
?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)),
|
||||
|
|
|
@ -126,8 +126,9 @@ subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
|||
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
||||
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
|
||||
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
||||
_ = emqx_tracer:trace_subscribe(Topic, SubId, SubOpts),
|
||||
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
|
||||
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
|
||||
SubPid = self(),
|
||||
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
||||
false -> %% New
|
||||
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
||||
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
||||
|
@ -172,7 +173,7 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
|||
SubPid = self(),
|
||||
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
||||
[{_, SubOpts}] ->
|
||||
emqx_tracer:trace_unsubscribe(Topic, SubOpts),
|
||||
emqx_trace:unsubscribe(Topic, SubOpts),
|
||||
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||
do_unsubscribe(Topic, SubPid, SubOpts);
|
||||
[] -> ok
|
||||
|
@ -195,7 +196,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
|
|||
|
||||
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
||||
publish(Msg) when is_record(Msg, message) ->
|
||||
_ = emqx_tracer:trace_publish(Msg),
|
||||
_ = emqx_trace:publish(Msg),
|
||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||
#message{headers = #{allow_publish := false}} ->
|
||||
|
|
|
@ -275,7 +275,7 @@ take_ws_cookie(ClientInfo, ConnInfo) ->
|
|||
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
|
||||
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
||||
|
||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||
handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
||||
case pipeline([fun enrich_conninfo/2,
|
||||
fun run_conn_hooks/2,
|
||||
fun check_connect/2,
|
||||
|
@ -285,6 +285,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
|||
fun auth_connect/2
|
||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
||||
NChannel1 = NChannel#channel{
|
||||
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
-compile({no_auto_import, [error/1]}).
|
||||
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
%% Logs
|
||||
-export([ debug/1
|
||||
, debug/2
|
||||
|
@ -64,10 +66,11 @@
|
|||
id := logger:handler_id(),
|
||||
level := logger:level(),
|
||||
dst := logger_dst(),
|
||||
filters := [{logger:filter_id(), logger:filter()}],
|
||||
status := started | stopped
|
||||
}).
|
||||
|
||||
-define(stopped_handlers, {?MODULE, stopped_handlers}).
|
||||
-define(STOPPED_HANDLERS, {?MODULE, stopped_handlers}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
|
@ -171,19 +174,19 @@ get_log_handlers() ->
|
|||
|
||||
-spec(get_log_handlers(started | stopped) -> [logger_handler_info()]).
|
||||
get_log_handlers(started) ->
|
||||
[log_hanlder_info(Conf, started) || Conf <- logger:get_handler_config()];
|
||||
[log_handler_info(Conf, started) || Conf <- logger:get_handler_config()];
|
||||
get_log_handlers(stopped) ->
|
||||
[log_hanlder_info(Conf, stopped) || Conf <- list_stopped_handler_config()].
|
||||
[log_handler_info(Conf, stopped) || Conf <- list_stopped_handler_config()].
|
||||
|
||||
-spec(get_log_handler(logger:handler_id()) -> logger_handler_info()).
|
||||
get_log_handler(HandlerId) ->
|
||||
case logger:get_handler_config(HandlerId) of
|
||||
{ok, Conf} ->
|
||||
log_hanlder_info(Conf, started);
|
||||
log_handler_info(Conf, started);
|
||||
{error, _} ->
|
||||
case read_stopped_handler_config(HandlerId) of
|
||||
error -> {error, {not_found, HandlerId}};
|
||||
{ok, Conf} -> log_hanlder_info(Conf, stopped)
|
||||
{ok, Conf} -> log_handler_info(Conf, stopped)
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -245,21 +248,21 @@ parse_transform(AST, _Opts) ->
|
|||
%% Internal Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
log_hanlder_info(#{id := Id, level := Level, module := logger_std_h,
|
||||
config := #{type := Type}}, Status) when
|
||||
log_handler_info(#{id := Id, level := Level, module := logger_std_h,
|
||||
filters := Filters, config := #{type := Type}}, Status) when
|
||||
Type =:= standard_io;
|
||||
Type =:= standard_error ->
|
||||
#{id => Id, level => Level, dst => console, status => Status};
|
||||
log_hanlder_info(#{id := Id, level := Level, module := logger_std_h,
|
||||
config := Config = #{type := file}}, Status) ->
|
||||
#{id => Id, level => Level, status => Status,
|
||||
#{id => Id, level => Level, dst => console, status => Status, filters => Filters};
|
||||
log_handler_info(#{id := Id, level := Level, module := logger_std_h,
|
||||
filters := Filters, config := Config = #{type := file}}, Status) ->
|
||||
#{id => Id, level => Level, status => Status, filters => Filters,
|
||||
dst => maps:get(file, Config, atom_to_list(Id))};
|
||||
|
||||
log_hanlder_info(#{id := Id, level := Level, module := logger_disk_log_h,
|
||||
config := #{file := Filename}}, Status) ->
|
||||
#{id => Id, level => Level, dst => Filename, status => Status};
|
||||
log_hanlder_info(#{id := Id, level := Level, module := _OtherModule}, Status) ->
|
||||
#{id => Id, level => Level, dst => unknown, status => Status}.
|
||||
log_handler_info(#{id := Id, level := Level, module := logger_disk_log_h,
|
||||
filters := Filters, config := #{file := Filename}}, Status) ->
|
||||
#{id => Id, level => Level, dst => Filename, status => Status, filters => Filters};
|
||||
log_handler_info(#{id := Id, level := Level, filters := Filters}, Status) ->
|
||||
#{id => Id, level => Level, dst => unknown, status => Status, filters => Filters}.
|
||||
|
||||
%% set level for all log handlers in one command
|
||||
set_all_log_handlers_level(Level) ->
|
||||
|
@ -281,29 +284,29 @@ rollback([{ID, Level} | List]) ->
|
|||
rollback([]) -> ok.
|
||||
|
||||
save_stopped_handler_config(HandlerId, Config) ->
|
||||
case persistent_term:get(?stopped_handlers, undefined) of
|
||||
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||
undefined ->
|
||||
persistent_term:put(?stopped_handlers, #{HandlerId => Config});
|
||||
persistent_term:put(?STOPPED_HANDLERS, #{HandlerId => Config});
|
||||
ConfList ->
|
||||
persistent_term:put(?stopped_handlers, ConfList#{HandlerId => Config})
|
||||
persistent_term:put(?STOPPED_HANDLERS, ConfList#{HandlerId => Config})
|
||||
end.
|
||||
read_stopped_handler_config(HandlerId) ->
|
||||
case persistent_term:get(?stopped_handlers, undefined) of
|
||||
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||
undefined -> error;
|
||||
ConfList -> maps:find(HandlerId, ConfList)
|
||||
end.
|
||||
remove_stopped_handler_config(HandlerId) ->
|
||||
case persistent_term:get(?stopped_handlers, undefined) of
|
||||
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||
undefined -> ok;
|
||||
ConfList ->
|
||||
case maps:find(HandlerId, ConfList) of
|
||||
error -> ok;
|
||||
{ok, _} ->
|
||||
persistent_term:put(?stopped_handlers, maps:remove(HandlerId, ConfList))
|
||||
persistent_term:put(?STOPPED_HANDLERS, maps:remove(HandlerId, ConfList))
|
||||
end
|
||||
end.
|
||||
list_stopped_handler_config() ->
|
||||
case persistent_term:get(?stopped_handlers, undefined) of
|
||||
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||
undefined -> [];
|
||||
ConfList -> maps:values(ConfList)
|
||||
end.
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_trace_handler).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Tracer]").
|
||||
|
||||
%% APIs
|
||||
-export([ running/0
|
||||
, install/3
|
||||
, install/4
|
||||
, uninstall/1
|
||||
, uninstall/2
|
||||
]).
|
||||
|
||||
%% For logger handler filters callbacks
|
||||
-export([ filter_clientid/2
|
||||
, filter_topic/2
|
||||
, filter_ip_address/2
|
||||
]).
|
||||
|
||||
-type tracer() :: #{
|
||||
name := binary(),
|
||||
type := clientid | topic | ip_address,
|
||||
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
|
||||
}.
|
||||
|
||||
-define(FORMAT,
|
||||
{logger_formatter, #{
|
||||
template => [
|
||||
time, " [", level, "] ",
|
||||
{clientid,
|
||||
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
|
||||
[{peername, [peername, " "], []}]
|
||||
},
|
||||
msg, "\n"
|
||||
],
|
||||
single_line => false,
|
||||
max_size => unlimited,
|
||||
depth => unlimited
|
||||
}}
|
||||
).
|
||||
|
||||
-define(CONFIG(_LogFile_), #{
|
||||
type => halt,
|
||||
file => _LogFile_,
|
||||
max_no_bytes => 512 * 1024 * 1024,
|
||||
overload_kill_enable => true,
|
||||
overload_kill_mem_size => 50 * 1024 * 1024,
|
||||
overload_kill_qlen => 20000,
|
||||
%% disable restart
|
||||
overload_kill_restart_after => infinity
|
||||
}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec install(Name :: binary() | list(),
|
||||
Type :: clientid | topic | ip_address,
|
||||
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
|
||||
Level :: logger:level() | all,
|
||||
LogFilePath :: string()) -> ok | {error, term()}.
|
||||
install(Name, Type, Filter, Level, LogFile) ->
|
||||
Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)},
|
||||
install(Who, Level, LogFile).
|
||||
|
||||
-spec install(Type :: clientid | topic | ip_address,
|
||||
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
|
||||
Level :: logger:level() | all,
|
||||
LogFilePath :: string()) -> ok | {error, term()}.
|
||||
install(Type, Filter, Level, LogFile) ->
|
||||
install(Filter, Type, Filter, Level, LogFile).
|
||||
|
||||
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
|
||||
install(Who, all, LogFile) ->
|
||||
install(Who, debug, LogFile);
|
||||
install(Who, Level, LogFile) ->
|
||||
PrimaryLevel = emqx_logger:get_primary_log_level(),
|
||||
try logger:compare_levels(Level, PrimaryLevel) of
|
||||
lt ->
|
||||
{error,
|
||||
io_lib:format(
|
||||
"Cannot trace at a log level (~s) "
|
||||
"lower than the primary log level (~s)",
|
||||
[Level, PrimaryLevel]
|
||||
)};
|
||||
_GtOrEq ->
|
||||
install_handler(Who, Level, LogFile)
|
||||
catch
|
||||
error:badarg ->
|
||||
{error, {invalid_log_level, Level}}
|
||||
end.
|
||||
|
||||
-spec uninstall(Type :: clientid | topic | ip_address,
|
||||
Name :: binary() | list()) -> ok | {error, term()}.
|
||||
uninstall(Type, Name) ->
|
||||
HandlerId = handler_id(#{type => Type, name => ensure_bin(Name)}),
|
||||
uninstall(HandlerId).
|
||||
|
||||
-spec uninstall(HandlerId :: atom()) -> ok | {error, term()}.
|
||||
uninstall(HandlerId) ->
|
||||
Res = logger:remove_handler(HandlerId),
|
||||
show_prompts(Res, HandlerId, "Stop trace"),
|
||||
Res.
|
||||
|
||||
%% @doc Return all running trace handlers information.
|
||||
-spec running() ->
|
||||
[
|
||||
#{
|
||||
name => binary(),
|
||||
type => topic | clientid | ip_address,
|
||||
id => atom(),
|
||||
filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
|
||||
level => logger:level(),
|
||||
dst => file:filename() | console | unknown
|
||||
}
|
||||
].
|
||||
running() ->
|
||||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||
|
||||
-spec filter_clientid(logger:log_event(), string()) -> logger:log_event() | ignore.
|
||||
filter_clientid(#{meta := #{clientid := ClientId}} = Log, ClientId) -> Log;
|
||||
filter_clientid(_Log, _ExpectId) -> ignore.
|
||||
|
||||
-spec filter_topic(logger:log_event(), string()) -> logger:log_event() | ignore.
|
||||
filter_topic(#{meta := #{topic := Topic}} = Log, TopicFilter) ->
|
||||
case emqx_topic:match(Topic, TopicFilter) of
|
||||
true -> Log;
|
||||
false -> ignore
|
||||
end;
|
||||
filter_topic(_Log, _ExpectId) -> ignore.
|
||||
|
||||
-spec filter_ip_address(logger:log_event(), string()) -> logger:log_event() | ignore.
|
||||
filter_ip_address(#{meta := #{peername := Peername}} = Log, IP) ->
|
||||
case lists:prefix(IP, Peername) of
|
||||
true -> Log;
|
||||
false -> ignore
|
||||
end;
|
||||
filter_ip_address(_Log, _ExpectId) -> ignore.
|
||||
|
||||
install_handler(Who, Level, LogFile) ->
|
||||
HandlerId = handler_id(Who),
|
||||
Config = #{
|
||||
level => Level,
|
||||
formatter => ?FORMAT,
|
||||
filter_default => stop,
|
||||
filters => filters(Who),
|
||||
config => ?CONFIG(LogFile)
|
||||
},
|
||||
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
|
||||
show_prompts(Res, Who, "Start trace"),
|
||||
Res.
|
||||
|
||||
filters(#{type := clientid, filter := Filter}) ->
|
||||
[{clientid, {fun ?MODULE:filter_clientid/2, ensure_list(Filter)}}];
|
||||
filters(#{type := topic, filter := Filter}) ->
|
||||
[{topic, {fun ?MODULE:filter_topic/2, ensure_bin(Filter)}}];
|
||||
filters(#{type := ip_address, filter := Filter}) ->
|
||||
[{ip_address, {fun ?MODULE:filter_ip_address/2, ensure_list(Filter)}}].
|
||||
|
||||
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
||||
Init = #{id => Id, level => Level, dst => Dst},
|
||||
case Filters of
|
||||
[{topic, {_FilterFun, Filter}}] ->
|
||||
<<"trace_topic_", Name/binary>> = atom_to_binary(Id),
|
||||
[Init#{type => topic, filter => Filter, name => Name} | Acc];
|
||||
[{clientid, {_FilterFun, Filter}}] ->
|
||||
<<"trace_clientid_", Name/binary>> = atom_to_binary(Id),
|
||||
[Init#{type => clientid, filter => Filter, name => Name} | Acc];
|
||||
[{ip_address, {_FilterFun, Filter}}] ->
|
||||
<<"trace_ip_address_", Name/binary>> = atom_to_binary(Id),
|
||||
[Init#{type => ip_address, filter => Filter, name => Name} | Acc];
|
||||
_ ->
|
||||
Acc
|
||||
end.
|
||||
|
||||
handler_id(#{type := Type, name := Name}) ->
|
||||
binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>).
|
||||
|
||||
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
|
||||
ensure_bin(Bin) when is_binary(Bin) -> Bin.
|
||||
|
||||
ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||
ensure_list(List) when is_list(List) -> List.
|
||||
|
||||
show_prompts(ok, Who, Msg) ->
|
||||
?LOG(info, Msg ++ " ~p " ++ "successfully~n", [Who]);
|
||||
show_prompts({error, Reason}, Who, Msg) ->
|
||||
?LOG(error, Msg ++ " ~p " ++ "failed by ~p~n", [Who, Reason]).
|
|
@ -1,261 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_tracer).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Tracer]").
|
||||
|
||||
%% APIs
|
||||
-export([ trace_publish/1
|
||||
, trace_subscribe/3
|
||||
, trace_unsubscribe/2
|
||||
, start_trace/3
|
||||
, start_trace/4
|
||||
, lookup_traces/0
|
||||
, stop_trace/3
|
||||
, stop_trace/2
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([is_match/3]).
|
||||
-endif.
|
||||
|
||||
-type(label() :: 'CONNECT' | 'CONNACK' | 'PUBLISH' | 'PUBACK' | 'PUBREC' |
|
||||
'PUBREL' | 'PUBCOMP' | 'SUBSCRIBE' | 'SUBACK' | 'UNSUBSCRIBE' |
|
||||
'UNSUBACK' | 'PINGREQ' | 'PINGRESP' | 'DISCONNECT' | 'AUTH').
|
||||
|
||||
-type(tracer() :: #{name := binary(),
|
||||
type := clientid | topic,
|
||||
clientid => emqx_types:clientid(),
|
||||
topic => emqx_types:topic(),
|
||||
labels := [label()]}).
|
||||
|
||||
-define(TRACER, ?MODULE).
|
||||
-define(FORMAT, {logger_formatter,
|
||||
#{template =>
|
||||
[time, " [", level, "] ",
|
||||
{clientid,
|
||||
[{peername,
|
||||
[clientid, "@", peername, " "],
|
||||
[clientid, " "]}],
|
||||
[{peername,
|
||||
[peername, " "],
|
||||
[]}]},
|
||||
msg, "\n"],
|
||||
single_line => false
|
||||
}}).
|
||||
-define(TOPIC_COMBINATOR, <<"_trace_topic_">>).
|
||||
-define(CLIENTID_COMBINATOR, <<"_trace_clientid_">>).
|
||||
-define(TOPIC_TRACE_ID(T, N),
|
||||
binary_to_atom(<<(N)/binary, ?TOPIC_COMBINATOR/binary, (T)/binary>>)).
|
||||
-define(CLIENT_TRACE_ID(C, N),
|
||||
binary_to_atom(<<(N)/binary, ?CLIENTID_COMBINATOR/binary, (C)/binary>>)).
|
||||
-define(TOPIC_TRACE(T, N, M), {topic, T, N, M}).
|
||||
-define(CLIENT_TRACE(C, N, M), {clientid, C, N, M}).
|
||||
-define(TOPIC_TRACE(T, N), {topic, T, N}).
|
||||
-define(CLIENT_TRACE(C, N), {clientid, C, N}).
|
||||
|
||||
-define(IS_LOG_LEVEL(L),
|
||||
L =:= emergency orelse
|
||||
L =:= alert orelse
|
||||
L =:= critical orelse
|
||||
L =:= error orelse
|
||||
L =:= warning orelse
|
||||
L =:= notice orelse
|
||||
L =:= info orelse
|
||||
L =:= debug).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
trace_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
|
||||
%% Do not trace '$SYS' publish
|
||||
ignore;
|
||||
trace_publish(#message{from = From, topic = Topic, payload = Payload})
|
||||
when is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info(#{topic => Topic,
|
||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} },
|
||||
"PUBLISH to ~s: ~0p", [Topic, Payload]).
|
||||
|
||||
trace_subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
||||
trace_subscribe(Topic, SubId, SubOpts) ->
|
||||
emqx_logger:info(#{topic => Topic,
|
||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||
"~ts SUBSCRIBE ~ts: Options: ~0p", [SubId, Topic, SubOpts]).
|
||||
|
||||
trace_unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
||||
trace_unsubscribe(Topic, SubOpts) ->
|
||||
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||
"~ts UNSUBSCRIBE ~ts: Options: ~0p",
|
||||
[maps:get(subid, SubOpts, ""), Topic, SubOpts]).
|
||||
|
||||
-spec(start_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(),
|
||||
logger:level() | all, string()) -> ok | {error, term()}).
|
||||
start_trace(clientid, ClientId0, Level, LogFile) ->
|
||||
ClientId = ensure_bin(ClientId0),
|
||||
Who = #{type => clientid, clientid => ClientId, name => ClientId, labels => []},
|
||||
start_trace(Who, Level, LogFile);
|
||||
start_trace(topic, Topic0, Level, LogFile) ->
|
||||
Topic = ensure_bin(Topic0),
|
||||
Who = #{type => topic, topic => Topic, name => Topic, labels => []},
|
||||
start_trace(Who, Level, LogFile).
|
||||
|
||||
%% @doc Start to trace clientid or topic.
|
||||
-spec(start_trace(tracer(), logger:level() | all, string()) -> ok | {error, term()}).
|
||||
start_trace(Who, all, LogFile) ->
|
||||
start_trace(Who, debug, LogFile);
|
||||
start_trace(Who, Level, LogFile) ->
|
||||
case ?IS_LOG_LEVEL(Level) of
|
||||
true ->
|
||||
PrimaryLevel = emqx_logger:get_primary_log_level(),
|
||||
try logger:compare_levels(Level, PrimaryLevel) of
|
||||
lt ->
|
||||
{error,
|
||||
io_lib:format("Cannot trace at a log level (~s) "
|
||||
"lower than the primary log level (~s)",
|
||||
[Level, PrimaryLevel])};
|
||||
_GtOrEq ->
|
||||
install_trace_handler(Who, Level, LogFile)
|
||||
catch
|
||||
_:Error ->
|
||||
{error, Error}
|
||||
end;
|
||||
false -> {error, {invalid_log_level, Level}}
|
||||
end.
|
||||
|
||||
-spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic()) ->
|
||||
ok | {error, term()}).
|
||||
stop_trace(Type, ClientIdOrTopic) ->
|
||||
stop_trace(Type, ClientIdOrTopic, ClientIdOrTopic).
|
||||
|
||||
%% @doc Stop tracing clientid or topic.
|
||||
-spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(), binary()) ->
|
||||
ok | {error, term()}).
|
||||
stop_trace(clientid, ClientId, Name) ->
|
||||
Who = #{type => clientid, clientid => ensure_bin(ClientId), name => ensure_bin(Name)},
|
||||
uninstall_trance_handler(Who);
|
||||
stop_trace(topic, Topic, Name) ->
|
||||
Who = #{type => topic, topic => ensure_bin(Topic), name => ensure_bin(Name)},
|
||||
uninstall_trance_handler(Who).
|
||||
|
||||
%% @doc Lookup all traces
|
||||
-spec(lookup_traces() -> [#{ type => topic | clientid,
|
||||
name => binary(),
|
||||
topic => emqx_types:topic(),
|
||||
level => logger:level(),
|
||||
dst => file:filename() | console | unknown
|
||||
}]).
|
||||
lookup_traces() ->
|
||||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||
|
||||
install_trace_handler(Who, Level, LogFile) ->
|
||||
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
||||
#{level => Level,
|
||||
formatter => ?FORMAT,
|
||||
config => #{type => halt, file => LogFile},
|
||||
filter_default => stop,
|
||||
filters => [{meta_key_filter,
|
||||
{fun filter_by_meta_key/2, Who}}]})
|
||||
of
|
||||
ok ->
|
||||
?LOG(info, "Start trace for ~p", [Who]),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
uninstall_trance_handler(Who) ->
|
||||
case logger:remove_handler(handler_id(Who)) of
|
||||
ok ->
|
||||
?LOG(info, "Stop trace for ~p", [Who]),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
filter_traces(#{id := Id, level := Level, dst := Dst}, Acc) ->
|
||||
IdStr = atom_to_binary(Id),
|
||||
case binary:split(IdStr, [?TOPIC_COMBINATOR]) of
|
||||
[Name, Topic] ->
|
||||
[#{ type => topic,
|
||||
name => Name,
|
||||
topic => Topic,
|
||||
level => Level,
|
||||
dst => Dst} | Acc];
|
||||
_ ->
|
||||
case binary:split(IdStr, [?CLIENTID_COMBINATOR]) of
|
||||
[Name, ClientId] ->
|
||||
[#{ type => clientid,
|
||||
name => Name,
|
||||
clientid => ClientId,
|
||||
level => Level,
|
||||
dst => Dst} | Acc];
|
||||
_ -> Acc
|
||||
end
|
||||
end.
|
||||
|
||||
%% Plan to support topic_and_client type, so we have type field.
|
||||
handler_id(#{type := topic, topic := Topic, name := Name}) ->
|
||||
?TOPIC_TRACE_ID(format(Topic), format(Name));
|
||||
handler_id(#{type := clientid, clientid := ClientId, name := Name}) ->
|
||||
?CLIENT_TRACE_ID(format(ClientId), format(Name)).
|
||||
|
||||
filter_by_meta_key(#{meta := Meta, level := Level} = Log, Context) ->
|
||||
case is_match(Context, Meta, Level) of
|
||||
true -> Log;
|
||||
false -> ignore
|
||||
end.
|
||||
|
||||
%% When the log level is higher than debug and clientid/topic is match,
|
||||
%% it will be logged without judging the content inside the labels.
|
||||
%% When the log level is debug, in addition to the matched clientid/topic,
|
||||
%% you also need to determine whether the label is in the labels
|
||||
is_match(#{type := clientid, clientid := ExpectId, labels := Labels},
|
||||
#{clientid := RealId} = Meta,
|
||||
Level) ->
|
||||
is_match(ExpectId =:= iolist_to_binary(RealId), Level, Meta, Labels);
|
||||
is_match(#{type := topic, topic := TopicFilter, labels := Labels},
|
||||
#{topic := Topic} = Meta, Level) ->
|
||||
is_match(emqx_topic:match(Topic, TopicFilter), Level, Meta, Labels);
|
||||
is_match(_, _, _) ->
|
||||
false.
|
||||
|
||||
is_match(true, debug, Meta, Labels) -> is_match_labels(Meta, Labels);
|
||||
is_match(Boolean, _, _Meta, _Labels) -> Boolean.
|
||||
|
||||
is_match_labels(#{trace_label := 'ALL'}, _Context) -> true;
|
||||
is_match_labels(_, []) -> true;
|
||||
is_match_labels(#{trace_label := Packet}, Context) ->
|
||||
lists:member(Packet, Context);
|
||||
is_match_labels(_, _) -> false.
|
||||
|
||||
format(List)when is_list(List) ->
|
||||
format(list_to_binary(List));
|
||||
format(Atom)when is_atom(Atom) ->
|
||||
format(atom_to_list(Atom));
|
||||
format(Bin0)when is_binary(Bin0) ->
|
||||
case byte_size(Bin0) of
|
||||
Size when Size =< 200 -> Bin0;
|
||||
_ -> emqx_misc:bin2hexstr_a_f_upper(Bin0)
|
||||
end.
|
||||
|
||||
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
|
||||
ensure_bin(Bin) when is_binary(Bin) -> Bin.
|
|
@ -0,0 +1,191 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_trace_handler_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-define(CLIENT, [{host, "localhost"},
|
||||
{clientid, <<"client">>},
|
||||
{username, <<"testuser">>},
|
||||
{password, <<"pass">>}
|
||||
]).
|
||||
|
||||
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(t_trace_clientid, Config) ->
|
||||
Config;
|
||||
init_per_testcase(_Case, Config) ->
|
||||
ok = emqx_logger:set_log_level(debug),
|
||||
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
ok = emqx_logger:set_log_level(warning),
|
||||
ok.
|
||||
|
||||
t_trace_clientid(_Config) ->
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(error),
|
||||
{error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"),
|
||||
emqx_logger:set_log_level(debug),
|
||||
%% add list clientid
|
||||
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
|
||||
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
|
||||
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
|
||||
{error, {invalid_log_level, bad_level}} =
|
||||
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
||||
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
||||
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
||||
ct:sleep(100),
|
||||
|
||||
%% Verify the tracing file exits
|
||||
?assert(filelib:is_regular("tmp/client.log")),
|
||||
?assert(filelib:is_regular("tmp/client2.log")),
|
||||
?assert(filelib:is_regular("tmp/client3.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([#{type => clientid, filter => "client", name => <<"client">>,
|
||||
id => trace_clientid_client, level => debug, dst => "tmp/client.log"},
|
||||
#{type => clientid, filter => "client2", name => <<"client2">>,
|
||||
id => trace_clientid_client2, level => debug, dst => "tmp/client2.log"},
|
||||
#{type => clientid, filter => "client3", name => <<"client3">>,
|
||||
id => trace_clientid_client3, level => debug, dst => "tmp/client3.log"}
|
||||
], emqx_trace_handler:running()),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
|
||||
{ok, T} = emqtt:start_link(?CLIENT),
|
||||
emqtt:connect(T),
|
||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||
emqtt:ping(T),
|
||||
ct:sleep(200),
|
||||
|
||||
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
||||
{ok, Bin} = file:read_file("tmp/client.log"),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])),
|
||||
?assert(filelib:file_size("tmp/client2.log") == 0),
|
||||
|
||||
%% Stop tracing
|
||||
ok = emqx_trace_handler:uninstall(clientid, <<"client">>),
|
||||
ok = emqx_trace_handler:uninstall(clientid, <<"client2">>),
|
||||
ok = emqx_trace_handler:uninstall(clientid, <<"client3">>),
|
||||
|
||||
emqtt:disconnect(T),
|
||||
?assertEqual([], emqx_trace_handler:running()).
|
||||
|
||||
t_trace_topic(_Config) ->
|
||||
{ok, T} = emqtt:start_link(?CLIENT),
|
||||
emqtt:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(debug),
|
||||
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
||||
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
||||
ct:sleep(100),
|
||||
|
||||
%% Verify the tracing file exits
|
||||
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
|
||||
?assert(filelib:is_regular("tmp/topic_trace_y.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([#{type => topic, filter => <<"x/#">>, id => 'trace_topic_x/#',
|
||||
level => debug, dst => "tmp/topic_trace_x.log", name => <<"x/#">>},
|
||||
#{type => topic, filter => <<"y/#">>, id => 'trace_topic_y/#',
|
||||
name => <<"y/#">>, level => debug, dst => "tmp/topic_trace_y.log"}
|
||||
],
|
||||
emqx_trace_handler:running()),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||
emqtt:subscribe(T, <<"x/y/z">>),
|
||||
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
|
||||
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
|
||||
|
||||
%% Stop tracing
|
||||
ok = emqx_trace_handler:uninstall(topic, <<"x/#">>),
|
||||
ok = emqx_trace_handler:uninstall(topic, <<"y/#">>),
|
||||
{error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
|
||||
?assertEqual([], emqx_trace_handler:running()),
|
||||
emqtt:disconnect(T).
|
||||
|
||||
t_trace_ip_address(_Config) ->
|
||||
{ok, T} = emqtt:start_link(?CLIENT),
|
||||
emqtt:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
||||
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
||||
ct:sleep(100),
|
||||
|
||||
%% Verify the tracing file exits
|
||||
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
||||
?assert(filelib:is_regular("tmp/ip_trace_y.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([#{type => ip_address, filter => "127.0.0.1",
|
||||
id => 'trace_ip_address_127.0.0.1', name => <<"127.0.0.1">>,
|
||||
level => debug, dst => "tmp/ip_trace_x.log"},
|
||||
#{type => ip_address, filter => "192.168.1.1",
|
||||
id => 'trace_ip_address_192.168.1.1', name => <<"192.168.1.1">>,
|
||||
level => debug, dst => "tmp/ip_trace_y.log"}
|
||||
],
|
||||
emqx_trace_handler:running()),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||
emqtt:subscribe(T, <<"x/y/z">>),
|
||||
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
|
||||
?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
|
||||
|
||||
%% Stop tracing
|
||||
ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>),
|
||||
ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>),
|
||||
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
|
||||
emqtt:disconnect(T),
|
||||
?assertEqual([], emqx_trace_handler:running()).
|
|
@ -1,171 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_tracer_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-define(CLIENT, [{host, "localhost"},
|
||||
{clientid, <<"client">>},
|
||||
{username, <<"testuser">>},
|
||||
{password, <<"pass">>}
|
||||
]).
|
||||
|
||||
all() -> [t_trace_clientid, t_trace_topic, t_is_match].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_trace_clientid(_Config) ->
|
||||
{ok, T} = emqtt:start_link(?CLIENT),
|
||||
emqtt:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(error),
|
||||
{error, _} = emqx_tracer:start_trace(clientid, <<"client">>, debug, "tmp/client.log"),
|
||||
emqx_logger:set_log_level(debug),
|
||||
%% add list clientid
|
||||
ok = emqx_tracer:start_trace(clientid, "client", debug, "tmp/client.log"),
|
||||
ok = emqx_tracer:start_trace(clientid, <<"client2">>, all, "tmp/client2.log"),
|
||||
ok = emqx_tracer:start_trace(clientid, <<"client3">>, all, "tmp/client3.log"),
|
||||
{error, {invalid_log_level, bad_level}} =
|
||||
emqx_tracer:start_trace(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
||||
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
||||
emqx_tracer:start_trace(clientid, <<"client5">>, debug, "."),
|
||||
ct:sleep(100),
|
||||
|
||||
%% Verify the tracing file exits
|
||||
?assert(filelib:is_regular("tmp/client.log")),
|
||||
?assert(filelib:is_regular("tmp/client2.log")),
|
||||
?assert(filelib:is_regular("tmp/client3.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([#{type => clientid, clientid => <<"client">>,
|
||||
name => <<"client">>, level => debug, dst => "tmp/client.log"},
|
||||
#{type => clientid, clientid => <<"client2">>,
|
||||
name => <<"client2">>, level => debug, dst => "tmp/client2.log"},
|
||||
#{type => clientid, clientid => <<"client3">>,
|
||||
name => <<"client3">>, level => debug, dst => "tmp/client3.log"}
|
||||
], emqx_tracer:lookup_traces()),
|
||||
|
||||
%% set the overall log level to debug
|
||||
emqx_logger:set_log_level(debug),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
|
||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||
ct:sleep(200),
|
||||
|
||||
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
||||
?assert(filelib:file_size("tmp/client.log") > 0),
|
||||
?assert(filelib:file_size("tmp/client2.log") == 0),
|
||||
|
||||
%% Stop tracing
|
||||
ok = emqx_tracer:stop_trace(clientid, <<"client">>),
|
||||
ok = emqx_tracer:stop_trace(clientid, <<"client2">>),
|
||||
ok = emqx_tracer:stop_trace(clientid, <<"client3">>),
|
||||
emqtt:disconnect(T),
|
||||
|
||||
emqx_logger:set_log_level(warning).
|
||||
|
||||
t_trace_topic(_Config) ->
|
||||
{ok, T} = emqtt:start_link(?CLIENT),
|
||||
emqtt:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(debug),
|
||||
ok = emqx_tracer:start_trace(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
||||
ok = emqx_tracer:start_trace(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
||||
ct:sleep(100),
|
||||
|
||||
%% Verify the tracing file exits
|
||||
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
|
||||
?assert(filelib:is_regular("tmp/topic_trace_y.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([#{type => topic, topic => <<"x/#">>, name => <<"x/#">>,
|
||||
level => debug, dst => "tmp/topic_trace_x.log"},
|
||||
#{type => topic, topic => <<"y/#">>, name => <<"y/#">>,
|
||||
level => debug, dst => "tmp/topic_trace_y.log"}],
|
||||
emqx_tracer:lookup_traces()),
|
||||
|
||||
%% set the overall log level to debug
|
||||
emqx_logger:set_log_level(debug),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
|
||||
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||
ct:sleep(200),
|
||||
|
||||
?assert(filelib:file_size("tmp/topic_trace_x.log") > 0),
|
||||
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
|
||||
|
||||
%% Stop tracing
|
||||
ok = emqx_tracer:stop_trace(topic, <<"x/#">>),
|
||||
ok = emqx_tracer:stop_trace(topic, <<"y/#">>),
|
||||
{error, _Reason} = emqx_tracer:stop_trace(topic, <<"z/#">>),
|
||||
emqtt:disconnect(T),
|
||||
|
||||
emqx_logger:set_log_level(warning).
|
||||
|
||||
t_is_match(_Config) ->
|
||||
ClientId = <<"test">>,
|
||||
?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []},
|
||||
#{clientid => ClientId}, warning)),
|
||||
?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []},
|
||||
#{clientid => ClientId}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid,
|
||||
labels => ['PUBLISH']}, #{clientid => ClientId}, debug)),
|
||||
?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []},
|
||||
#{clientid => ClientId, trace_label => 'PUBLISH'}, debug)),
|
||||
?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['PUBLISH']},
|
||||
#{clientid => ClientId, trace_label => 'PUBLISH'}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']},
|
||||
#{clientid => ClientId, trace_label => 'PUBLISH'}, debug)),
|
||||
?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']},
|
||||
#{clientid => ClientId, trace_label => 'ALL'}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []},
|
||||
#{clientid => <<"Bad">>}, warning)),
|
||||
?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []},
|
||||
#{clientid => <<"Bad">>, trace_label => 'PUBLISH'}, debug)),
|
||||
|
||||
Topic = <<"/test/#">>,
|
||||
?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []},
|
||||
#{topic => <<"/test/1">>}, warning)),
|
||||
?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []},
|
||||
#{topic => <<"/test/1/2">>}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBSCRIBE']},
|
||||
#{topic => <<"/test/1/2">>}, debug)),
|
||||
?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []},
|
||||
#{topic => <<"/test/3">>, trace_label => 'PUBLISH'}, debug)),
|
||||
?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']},
|
||||
#{topic => <<"/test/398/">>, trace_label => 'PUBLISH'}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBACK']},
|
||||
#{topic => <<"/test/1/xy/y">>, trace_label => 'PUBLISH'}, debug)),
|
||||
|
||||
?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']},
|
||||
#{topic => <<"/t1est/398/">>, trace_label => 'PUBLISH'}, debug)),
|
||||
?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []},
|
||||
#{topic => <<"/t1est/1/xy/y">>, trace_label => 'PUBLISH'}, debug)),
|
||||
ok.
|
Loading…
Reference in New Issue