diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 97cb182e6..b6468fa7c 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -21,7 +21,7 @@ -include("emqx_mgmt.hrl"). --elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_mgmt_cli]}}]). +-elvis([{elvis_style, invalid_dynamic_call, disable}]). -define(PRINT_CMD(Cmd, Desc), io:format("~-48s# ~s~n", [Cmd, Desc])). @@ -38,6 +38,7 @@ , vm/1 , mnesia/1 , trace/1 + , traces/1 , log/1 , mgmt/1 , data/1 @@ -421,39 +422,43 @@ log(_) -> trace(["list"]) -> lists:foreach(fun(Trace) -> - #{type := Type, level := Level, dst := Dst} = Trace, - Who = maps:get(Type, Trace), - emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Who, Level, Dst]) - end, emqx_tracer:lookup_traces()); + #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, + emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) + end, emqx_trace_handler:running()); -trace(["stop", "client", ClientId]) -> - trace_off(clientid, ClientId); +trace(["stop", Operation, ClientId]) -> + case trace_type(Operation) of + {ok, Type} -> trace_off(Type, ClientId); + error -> trace([]) + end; -trace(["start", "client", ClientId, LogFile]) -> - trace_on(clientid, ClientId, all, LogFile); +trace(["start", Operation, ClientId, LogFile]) -> + trace(["start", Operation, ClientId, LogFile, "all"]); -trace(["start", "client", ClientId, LogFile, Level]) -> - trace_on(clientid, ClientId, list_to_atom(Level), LogFile); - -trace(["stop", "topic", Topic]) -> - trace_off(topic, Topic); - -trace(["start", "topic", Topic, LogFile]) -> - trace_on(topic, Topic, all, LogFile); - -trace(["start", "topic", Topic, LogFile, Level]) -> - trace_on(topic, Topic, list_to_atom(Level), LogFile); +trace(["start", Operation, ClientId, LogFile, Level]) -> + case trace_type(Operation) of + {ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile); + error -> trace([]) + end; trace(_) -> - emqx_ctl:usage([{"trace list", "List all traces started"}, - {"trace start client []", "Traces for a client"}, - {"trace stop client ", "Stop tracing for a client"}, - {"trace start topic [] ", "Traces for a topic"}, - {"trace stop topic ", "Stop tracing for a topic"}]). + emqx_ctl:usage([{"trace list", "List all traces started on local node"}, + {"trace start client []", + "Traces for a client on local node"}, + {"trace stop client ", + "Stop tracing for a client on local node"}, + {"trace start topic [] ", + "Traces for a topic on local node"}, + {"trace stop topic ", + "Stop tracing for a topic on local node"}, + {"trace start ip_address [] ", + "Traces for a client ip on local node"}, + {"trace stop ip_addresss ", + "Stop tracing for a client ip on local node"} + ]). --dialyzer({nowarn_function, [trace_on/4, trace_off/2]}). trace_on(Who, Name, Level, LogFile) -> - case emqx_tracer:start_trace(Who, Name, Level, LogFile) of + case emqx_trace_handler:install(Who, Name, Level, LogFile) of ok -> emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]); {error, Error} -> @@ -461,13 +466,94 @@ trace_on(Who, Name, Level, LogFile) -> end. trace_off(Who, Name) -> - case emqx_tracer:stop_trace(Who, Name) of + case emqx_trace_handler:uninstall(Who, Name) of ok -> emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]); {error, Error} -> emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error]) end. +%%-------------------------------------------------------------------- +%% @doc Trace Cluster Command +traces(["list"]) -> + {ok, List} = emqx_trace_api:list_trace(get, []), + case List of + [] -> + emqx_ctl:print("Cluster Trace is empty~n", []); + _ -> + lists:foreach(fun(Trace) -> + #{type := Type, name := Name, status := Status, + log_size := LogSize} = Trace, + emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n", + [Name, Type, maps:get(Type, Trace), Status, LogSize]) + end, List) + end, + length(List); + +traces(["stop", Name]) -> + trace_cluster_off(Name); + +traces(["delete", Name]) -> + trace_cluster_del(Name); + +traces(["start", Name, Operation, Filter]) -> + traces(["start", Name, Operation, Filter, "900"]); + +traces(["start", Name, Operation, Filter, DurationS]) -> + case trace_type(Operation) of + {ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS); + error -> traces([]) + end; + +traces(_) -> + emqx_ctl:usage([{"traces list", "List all cluster traces started"}, + {"traces start client ", "Traces for a client in cluster"}, + {"traces start topic ", "Traces for a topic in cluster"}, + {"traces start ip_address ", "Traces for a IP in cluster"}, + {"traces stop ", "Stop trace in cluster"}, + {"traces delete ", "Delete trace in cluster"} + ]). + +trace_cluster_on(Name, Type, Filter, DurationS0) -> + case erlang:whereis(emqx_trace) of + undefined -> + emqx_ctl:print("[error] Tracer module not started~n" + "Please run `emqx_ctl modules start tracer` " + "or `emqx_ctl modules start emqx_mod_trace` first~n", []); + _ -> + DurationS = list_to_integer(DurationS0), + Now = erlang:system_time(second), + Trace = #{ name => list_to_binary(Name) + , type => atom_to_binary(Type) + , Type => list_to_binary(Filter) + , start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)) + , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) + }, + case emqx_trace:create(Trace) of + ok -> + emqx_ctl:print("Cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]); + {error, Error} -> + emqx_ctl:print("[error] Cluster_trace ~s ~s=~s ~p~n", + [Name, Type, Filter, Error]) + end + end. + +trace_cluster_del(Name) -> + case emqx_trace:delete(list_to_binary(Name)) of + ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]); + {error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error]) + end. + +trace_cluster_off(Name) -> + case emqx_trace:update(list_to_binary(Name), false) of + ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]); + {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) + end. + +trace_type("client") -> {ok, clientid}; +trace_type("topic") -> {ok, topic}; +trace_type("ip_address") -> {ok, ip_address}; +trace_type(_) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 77d46b744..434e96e21 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -45,6 +45,7 @@ groups() -> t_vm_cmd, t_plugins_cmd, t_trace_cmd, + t_traces_cmd, t_broker_cmd, t_router_cmd, t_subscriptions_cmd, @@ -64,6 +65,23 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps(apps()). +init_per_testcase(t_plugins_cmd, Config) -> + meck:new(emqx_plugins, [non_strict, passthrough]), + meck:expect(emqx_plugins, load, fun(_) -> ok end), + meck:expect(emqx_plugins, unload, fun(_) -> ok end), + meck:expect(emqx_plugins, reload, fun(_) -> ok end), + mock_print(), + Config; +init_per_testcase(_Case, Config) -> + mock_print(), + Config. + +end_per_testcase(t_plugins_cmd, _Config) -> + meck:unload(emqx_plugins), + unmock_print(); +end_per_testcase(_Case, _Config) -> + unmock_print(). + t_app(_Config) -> {ok, AppSecret} = emqx_mgmt_auth:add_app(<<"app_id">>, <<"app_name">>), ?assert(emqx_mgmt_auth:is_authorized(<<"app_id">>, AppSecret)), @@ -96,7 +114,6 @@ t_app(_Config) -> ok. t_log_cmd(_) -> - mock_print(), lists:foreach(fun(Level) -> emqx_mgmt_cli:log(["primary-level", Level]), ?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["primary-level"])) @@ -109,12 +126,9 @@ t_log_cmd(_) -> ?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["handlers", "set-level", atom_to_list(Id), Level])) end, ?LOG_LEVELS) - || #{id := Id} <- emqx_logger:get_log_handlers()], - meck:unload(). + || #{id := Id} <- emqx_logger:get_log_handlers()]. t_mgmt_cmd(_) -> - % ct:pal("start testing the mgmt command"), - mock_print(), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt( ["lookup", "emqx_appid"]), "Not Found.")), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt( @@ -127,28 +141,19 @@ t_mgmt_cmd(_) -> ["update", "emqx_appid", "ts"]), "update successfully")), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt( ["delete", "emqx_appid"]), "ok")), - ok = emqx_mgmt_cli:mgmt(["list"]), - meck:unload(). + ok = emqx_mgmt_cli:mgmt(["list"]). t_status_cmd(_) -> - % ct:pal("start testing status command"), - mock_print(), %% init internal status seem to be always 'starting' when running ct tests - ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")), - meck:unload(). + ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")). t_broker_cmd(_) -> - % ct:pal("start testing the broker command"), - mock_print(), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([]), "sysdescr")), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["stats"]), "subscriptions.shared")), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["metrics"]), "bytes.sent")), - ?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([undefined]), "broker")), - meck:unload(). + ?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([undefined]), "broker")). t_clients_cmd(_) -> - % ct:pal("start testing the client command"), - mock_print(), process_flag(trap_exit, true), {ok, T} = emqtt:start_link([{clientid, <<"client12">>}, {username, <<"testuser1">>}, @@ -164,7 +169,6 @@ t_clients_cmd(_) -> receive {'EXIT', T, _} -> ok - % ct:pal("Connection closed: ~p~n", [Reason]) after 500 -> erlang:error("Client is not kick") @@ -179,10 +183,11 @@ t_clients_cmd(_) -> {ok, Connack, <<>>, _} = raw_recv_pase(Bin), timer:sleep(300), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "client13")), - meck:unload(). + % emqx_mgmt_cli:clients(["kick", "client13"]), % timer:sleep(500), % ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "Not Found")). + ok. raw_recv_pase(Packet) -> emqx_frame:parse(Packet). @@ -191,8 +196,6 @@ raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). t_vm_cmd(_) -> - % ct:pal("start testing the vm command"), - mock_print(), [[?assertMatch({match, _}, re:run(Result, Name)) || Result <- emqx_mgmt_cli:vm([Name])] || Name <- ["load", "memory", "process", "io", "ports"]], @@ -205,12 +208,9 @@ t_vm_cmd(_) -> [?assertMatch({match, _}, re:run(Result, "io")) || Result <- emqx_mgmt_cli:vm(["io"])], [?assertMatch({match, _}, re:run(Result, "ports")) - || Result <- emqx_mgmt_cli:vm(["ports"])], - unmock_print(). + || Result <- emqx_mgmt_cli:vm(["ports"])]. t_trace_cmd(_) -> - % ct:pal("start testing the trace command"), - mock_print(), logger:set_primary_config(level, debug), {ok, T} = emqtt:start_link([{clientid, <<"client">>}, {username, <<"testuser">>}, @@ -237,12 +237,34 @@ t_trace_cmd(_) -> Trace7 = emqx_mgmt_cli:trace(["start", "topic", "a/b/c", "log/clientid_trace.log", "error"]), ?assertMatch({match, _}, re:run(Trace7, "successfully")), - logger:set_primary_config(level, error), - unmock_print(). + logger:set_primary_config(level, error). + +t_traces_cmd(_) -> + emqx_trace:mnesia(boot), + Count1 = emqx_mgmt_cli:traces(["list"]), + ?assertEqual(0, Count1), + Error1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]), + ?assertMatch({match, _}, re:run(Error1, "Tracer module not started")), + emqx_trace:start_link(), + Trace1 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]), + ?assertMatch({match, _}, re:run(Trace1, "successfully")), + Count2 = emqx_mgmt_cli:traces(["list"]), + ?assertEqual(1, Count2), + Error2 = emqx_mgmt_cli:traces(["start", "test-name", "client", "clientid-dev"]), + ?assertMatch({match, _}, re:run(Error2, "already_existed")), + Trace2 = emqx_mgmt_cli:traces(["stop", "test-name"]), + ?assertMatch({match, _}, re:run(Trace2, "successfully")), + Count3 = emqx_mgmt_cli:traces(["list"]), + ?assertEqual(1, Count3), + Trace3 = emqx_mgmt_cli:traces(["delete", "test-name"]), + ?assertMatch({match, _}, re:run(Trace3, "successfully")), + Count4 = emqx_mgmt_cli:traces(["list"]), + ?assertEqual(0, Count4), + Error3 = emqx_mgmt_cli:traces(["delete", "test-name"]), + ?assertMatch({match, _}, re:run(Error3, "not_found")), + ok. t_router_cmd(_) -> - % ct:pal("start testing the router command"), - mock_print(), {ok, T} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"testuser1">>}, {password, <<"pass1">>} @@ -257,12 +279,9 @@ t_router_cmd(_) -> emqtt:connect(T1), emqtt:subscribe(T1, <<"a/b/c/d">>), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["list"]), "a/b/c | a/b/c")), - ?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["show", "a/b/c"]), "a/b/c")), - unmock_print(). + ?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["show", "a/b/c"]), "a/b/c")). t_subscriptions_cmd(_) -> - % ct:pal("Start testing the subscriptions command"), - mock_print(), {ok, T3} = emqtt:start_link([{clientid, <<"client">>}, {username, <<"testuser">>}, {password, <<"pass">>} @@ -273,22 +292,18 @@ t_subscriptions_cmd(_) -> [?assertMatch({match, _} , re:run(Result, "b/b/c")) || Result <- emqx_mgmt_cli:subscriptions(["show", <<"client">>])], ?assertEqual(emqx_mgmt_cli:subscriptions(["add", "client", "b/b/c", "0"]), "ok~n"), - ?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"), - unmock_print(). + ?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"). t_listeners_cmd_old(_) -> ok = emqx_listeners:ensure_all_started(), - mock_print(), ?assertEqual(emqx_mgmt_cli:listeners([]), ok), ?assertEqual( "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n", emqx_mgmt_cli:listeners(["stop", "wss", "8084"]) - ), - unmock_print(). + ). t_listeners_cmd_new(_) -> ok = emqx_listeners:ensure_all_started(), - mock_print(), ?assertEqual(emqx_mgmt_cli:listeners([]), ok), ?assertEqual( "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n", @@ -304,16 +319,11 @@ t_listeners_cmd_new(_) -> ), ?assertEqual( emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]), - "Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n" - ), - unmock_print(). + "Failed to restart bad:listener:identifier listener: " + "{no_such_listener,\"bad:listener:identifier\"}\n" + ). t_plugins_cmd(_) -> - mock_print(), - meck:new(emqx_plugins, [non_strict, passthrough]), - meck:expect(emqx_plugins, load, fun(_) -> ok end), - meck:expect(emqx_plugins, unload, fun(_) -> ok end), - meck:expect(emqx_plugins, reload, fun(_) -> ok end), ?assertEqual(emqx_mgmt_cli:plugins(["list"]), ok), ?assertEqual( emqx_mgmt_cli:plugins(["unload", "emqx_auth_mnesia"]), @@ -326,11 +336,9 @@ t_plugins_cmd(_) -> ?assertEqual( emqx_mgmt_cli:plugins(["unload", "emqx_management"]), "Plugin emqx_management can not be unloaded.~n" - ), - unmock_print(). + ). t_cli(_) -> - mock_print(), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([""]), "status")), [?assertMatch({match, _}, re:run(Value, "broker")) || Value <- emqx_mgmt_cli:broker([""])], @@ -352,9 +360,10 @@ t_cli(_) -> || Value <- emqx_mgmt_cli:mnesia([""])], [?assertMatch({match, _}, re:run(Value, "trace")) || Value <- emqx_mgmt_cli:trace([""])], + [?assertMatch({match, _}, re:run(Value, "traces")) + || Value <- emqx_mgmt_cli:traces([""])], [?assertMatch({match, _}, re:run(Value, "mgmt")) - || Value <- emqx_mgmt_cli:mgmt([""])], - unmock_print(). + || Value <- emqx_mgmt_cli:mgmt([""])]. mock_print() -> catch meck:unload(emqx_ctl), diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl index eb2ed7276..eb41703fa 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl @@ -18,10 +18,9 @@ -behaviour(gen_server). -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). --logger_header("[Trace]"). +-logger_header("[Tracer]"). %% Mnesia bootstrap -export([mnesia/1]). @@ -29,6 +28,11 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +-export([ publish/1 + , subscribe/3 + , unsubscribe/2 + ]). + -export([ start_link/0 , list/0 , list/1 @@ -41,6 +45,7 @@ -export([ format/1 , zip_dir/0 + , filename/2 , trace_dir/0 , trace_file/1 , delete_files_after_send/2 @@ -49,23 +54,22 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TRACE, ?MODULE). --define(PACKETS, tuple_to_list(?TYPE_NAMES)). -define(MAX_SIZE, 30). -ifdef(TEST). -export([log_file/2]). -endif. +-export_type([ip_address/0]). +-type ip_address() :: string(). + -record(?TRACE, { name :: binary() | undefined | '_' - , type :: clientid | topic | undefined | '_' - , topic :: emqx_types:topic() | undefined | '_' - , clientid :: emqx_types:clientid() | undefined | '_' - , packets = [] :: list() | '_' + , type :: clientid | topic | ip_address | undefined | '_' + , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_' , enable = true :: boolean() | '_' - , start_at :: integer() | undefined | binary() | '_' - , end_at :: integer() | undefined | binary() | '_' - , log_size = #{} :: map() | '_' + , start_at :: integer() | undefined | '_' + , end_at :: integer() | undefined | '_' }). mnesia(boot) -> @@ -77,6 +81,31 @@ mnesia(boot) -> mnesia(copy) -> ok = ekka_mnesia:copy_table(?TRACE, disc_copies). +publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; +publish(#message{from = From, topic = Topic, payload = Payload}) when + is_binary(From); is_atom(From) -> + emqx_logger:info( + #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, + "PUBLISH to ~s: ~0p", + [Topic, Payload] + ). + +subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; +subscribe(Topic, SubId, SubOpts) -> + emqx_logger:info( + #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, + "~ts SUBSCRIBE ~ts: Options: ~0p", + [SubId, Topic, SubOpts] + ). + +unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; +unsubscribe(Topic, SubOpts) -> + emqx_logger:info( + #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, + "~ts UNSUBSCRIBE ~ts: Options: ~0p", + [maps:get(subid, SubOpts, ""), Topic, SubOpts] + ). + -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -89,18 +118,18 @@ list() -> list(Enable) -> ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). --spec create([{Key :: binary(), Value :: binary()}]) -> +-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) -> ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> case to_trace(Trace) of - {ok, TraceRec} -> create_new_trace(TraceRec); + {ok, TraceRec} -> insert_new_trace(TraceRec); {error, Reason} -> {error, Reason} end; false -> - {error, """The number of traces created has reached the maximum, -please delete the useless ones first"""} + {error, "The number of traces created has reached the maximum" + " please delete the useless ones first"} end. -spec delete(Name :: binary()) -> ok | {error, not_found}. @@ -163,12 +192,8 @@ delete_files_after_send(TraceLog, Zips) -> -spec format(list(#?TRACE{})) -> list(map()). format(Traces) -> Fields = record_info(fields, ?TRACE), - lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) -> - Trace = Trace0#?TRACE{ - start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)), - end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt)) - }, - [_ | Values] = tuple_to_list(Trace), + lists:map(fun(Trace0 = #?TRACE{}) -> + [_ | Values] = tuple_to_list(Trace0), maps:from_list(lists:zip(Fields, Values)) end, Traces). @@ -198,7 +223,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor case maps:take(Pid, Monitors) of error -> {noreply, State}; {Files, NewMonitors} -> - lists:foreach(fun(F) -> file:delete(F) end, Files), + lists:foreach(fun file:delete/1, Files), {noreply, State#{monitors => NewMonitors}} end; handle_info({timeout, TRef, update_trace}, @@ -227,14 +252,12 @@ terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> code_change(_, State, _Extra) -> {ok, State}. -create_new_trace(Trace) -> +insert_new_trace(Trace) -> Tran = fun() -> case mnesia:read(?TRACE, Trace#?TRACE.name) of [] -> - #?TRACE{start_at = StartAt, topic = Topic, - clientid = ClientId, packets = Packets} = Trace, - Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic, - clientid = ClientId, packets = Packets}, + #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace, + Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter}, case mnesia:match_object(?TRACE, Match, read) of [] -> mnesia:write(?TRACE, Trace, write); [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) @@ -248,7 +271,7 @@ update_trace(Traces) -> Now = erlang:system_time(second), {_Waiting, Running, Finished} = classify_by_time(Traces, Now), disable_finished(Finished), - Started = already_running(), + Started = emqx_trace_handler:running(), {NeedRunning, AllStarted} = start_trace(Running, Started), NeedStop = AllStarted -- NeedRunning, ok = stop_trace(NeedStop, Started), @@ -257,14 +280,8 @@ update_trace(Traces) -> emqx_misc:start_timer(NextTime, update_trace). stop_all_trace_handler() -> - lists:foreach(fun(#{type := Type, name := Name} = Trace) -> - _ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name) - end - , already_running()). - -already_running() -> - emqx_tracer:lookup_traces(). - + lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, + emqx_trace_handler:running()). get_enable_trace() -> {atomic, Traces} = mnesia:transaction(fun() -> @@ -286,31 +303,24 @@ find_closest_time(Traces, Now) -> disable_finished([]) -> ok; disable_finished(Traces) -> - NameWithLogSize = - lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) -> - FileSize = filelib:file_size(log_file(Name, StartAt)), - {Name, FileSize} end, Traces), transaction(fun() -> - lists:map(fun({Name, LogSize}) -> + lists:map(fun(#?TRACE{name = Name}) -> case mnesia:read(?TRACE, Name, write) of [] -> ok; - [Trace = #?TRACE{log_size = Logs}] -> - mnesia:write(?TRACE, Trace#?TRACE{enable = false, - log_size = Logs#{node() => LogSize}}, write) - end end, NameWithLogSize) + [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write) + end end, Traces) end). start_trace(Traces, Started0) -> Started = lists:map(fun(#{name := Name}) -> Name end, Started0), lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> case lists:member(Name, StartedAcc) of - true -> {[Name | Running], StartedAcc}; + true -> + {[Name | Running], StartedAcc}; false -> case start_trace(Trace) of ok -> {[Name | Running], [Name | StartedAcc]}; - Error -> - ?LOG(error, "(~p)start trace failed by:~p", [Name, Error]), - {[Name | Running], StartedAcc} + {error, _Reason} -> {[Name | Running], StartedAcc} end end end, {[], Started}, Traces). @@ -318,27 +328,16 @@ start_trace(Traces, Started0) -> start_trace(Trace) -> #?TRACE{name = Name , type = Type - , clientid = ClientId - , topic = Topic - , packets = Packets + , filter = Filter , start_at = Start } = Trace, - Who0 = #{name => Name, labels => Packets}, - Who = - case Type of - topic -> Who0#{type => topic, topic => Topic}; - clientid -> Who0#{type => clientid, clientid => ClientId} - end, - case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of - ok -> ok; - {error, {already_exist, _}} -> ok; - {error, Reason} -> {error, Reason} - end. + Who = #{name => Name, type => Type, filter => Filter}, + emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> - lists:foreach(fun(#{name := Name, type := Type} = Trace) -> + lists:foreach(fun(#{name := Name, type := Type}) -> case lists:member(Name, Finished) of - true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name); + true -> emqx_trace_handler:uninstall(Type, Name); false -> ok end end, Started). @@ -371,23 +370,31 @@ classify_by_time([Trace = #?TRACE{end_at = End} | Traces], classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). -to_trace(TraceList) -> - case to_trace(TraceList, #?TRACE{}) of +to_trace(TraceParam) -> + case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of {error, Reason} -> {error, Reason}; {ok, #?TRACE{name = undefined}} -> {error, "name required"}; {ok, #?TRACE{type = undefined}} -> - {error, "type required"}; - {ok, #?TRACE{topic = undefined, clientid = undefined}} -> - {error, "topic/clientid cannot be both empty"}; - {ok, Trace} -> - case fill_default(Trace) of + {error, "type=[topic,clientid,ip_address] required"}; + {ok, #?TRACE{filter = undefined}} -> + {error, "topic/clientid/ip_address filter required"}; + {ok, TraceRec0} -> + case fill_default(TraceRec0) of #?TRACE{start_at = Start, end_at = End} when End =< Start -> {error, "failed by start_at >= end_at"}; - Trace1 -> {ok, Trace1} + TraceRec -> {ok, TraceRec} end end. +ensure_proplists(#{} = Trace) -> maps:to_list(Trace); +ensure_proplists(Trace) when is_list(Trace) -> + lists:foldl( + fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc]; + ({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc]; + (_, Acc) -> Acc + end, [], Trace). + fill_default(Trace = #?TRACE{start_at = undefined}) -> fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> @@ -395,27 +402,35 @@ fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> fill_default(Trace) -> Trace. to_trace([], Rec) -> {ok, Rec}; -to_trace([{<<"name">>, Name} | Trace], Rec) -> +to_trace([{name, Name} | Trace], Rec) -> case binary:match(Name, [<<"/">>], []) of - nomatch -> to_trace(Trace, Rec#?TRACE{name = Name}); + nomatch when byte_size(Name) < 200 -> to_trace(Trace, Rec#?TRACE{name = Name}); + nomatch -> {error, "name(latin1) length must < 200"}; _ -> {error, "name cannot contain /"} end; -to_trace([{<<"type">>, Type} | Trace], Rec) -> - case lists:member(Type, [<<"clientid">>, <<"topic">>]) of +to_trace([{type, Type} | Trace], Rec) -> + case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); - false -> {error, "incorrect type: only support clientid/topic"} + false -> {error, "incorrect type: only support clientid/topic/ip_address"} end; -to_trace([{<<"topic">>, Topic} | Trace], Rec) -> +to_trace([{topic, Topic} | Trace], Rec) -> case validate_topic(Topic) of - ok -> to_trace(Trace, Rec#?TRACE{topic = Topic}); + ok -> to_trace(Trace, Rec#?TRACE{filter = Topic}); {error, Reason} -> {error, Reason} end; -to_trace([{<<"start_at">>, StartAt} | Trace], Rec) -> +to_trace([{clientid, ClientId} | Trace], Rec) -> + to_trace(Trace, Rec#?TRACE{filter = ClientId}); +to_trace([{ip_address, IP} | Trace], Rec) -> + case inet:parse_address(binary_to_list(IP)) of + {ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)}); + {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} + end; +to_trace([{start_at, StartAt} | Trace], Rec) -> case to_system_second(StartAt) of {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); {error, Reason} -> {error, Reason} end; -to_trace([{<<"end_at">>, EndAt} | Trace], Rec) -> +to_trace([{end_at, EndAt} | Trace], Rec) -> Now = erlang:system_time(second), case to_system_second(EndAt) of {ok, Sec} when Sec > Now -> @@ -425,21 +440,14 @@ to_trace([{<<"end_at">>, EndAt} | Trace], Rec) -> {error, Reason} -> {error, Reason} end; -to_trace([{<<"clientid">>, ClientId} | Trace], Rec) -> - to_trace(Trace, Rec#?TRACE{clientid = ClientId}); -to_trace([{<<"packets">>, PacketList} | Trace], Rec) -> - case to_packets(PacketList) of - {ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets}); - {error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])} - end; to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. validate_topic(TopicName) -> - try emqx_topic:validate(name, TopicName) of + try emqx_topic:validate(filter, TopicName) of true -> ok catch error:Error -> - {error, io_lib:format("~s invalid by ~p", [TopicName, Error])} + {error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])} end. to_system_second(At) -> @@ -450,14 +458,6 @@ to_system_second(At) -> {error, ["The rfc3339 specification not satisfied: ", At]} end. -to_packets(Packets) when is_list(Packets) -> - AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets), - case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of - [] -> {ok, AtomTypes}; - InvalidE -> {error, InvalidE} - end; -to_packets(Packets) -> {error, Packets}. - zip_dir() -> trace_dir() ++ "zip/". diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index e6c87d69f..7d982c00d 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -27,18 +27,37 @@ , download_zip_log/2 , stream_log_file/2 ]). --export([read_trace_file/3]). +-export([ read_trace_file/3 + , get_trace_size/0 + ]). -define(TO_BIN(_B_), iolist_to_binary(_B_)). --define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])}). +-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}). -list_trace(_, Params) -> - List = - case Params of - [{<<"enable">>, Enable}] -> emqx_trace:list(binary_to_existing_atom(Enable)); - _ -> emqx_trace:list() - end, - {ok, emqx_trace:format(List)}. +list_trace(_, _Params) -> + case emqx_trace:list() of + [] -> {ok, []}; + List0 -> + List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0), + Nodes = ekka_mnesia:running_nodes(), + TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000), + AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), + Now = erlang:system_time(second), + Traces = + lists:map(fun(Trace = #{name := Name, start_at := Start, + end_at := End, enable := Enable, type := Type, filter := Filter}) -> + FileName = emqx_trace:filename(Name, Start), + LogSize = collect_file_size(Nodes, FileName, AllFileSize), + Trace0 = maps:without([enable, filter], Trace), + Trace0#{ log_size => LogSize + , Type => Filter + , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) + , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) + , status => status(Enable, Start, End, Now) + } + end, emqx_trace:format(List)), + {ok, Traces} + end. create_trace(_, Param) -> case emqx_trace:create(Param) of @@ -97,28 +116,48 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> end, [], TraceFiles). collect_trace_file(TraceLog) -> + cluster_call(emqx_trace, trace_file, [TraceLog], 60000). + +cluster_call(Mod, Fun, Args, Timeout) -> Nodes = ekka_mnesia:running_nodes(), - {Files, BadNodes} = rpc:multicall(Nodes, emqx_trace, trace_file, [TraceLog], 60000), - BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), - Files. + {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), + BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]), + GoodRes. stream_log_file(#{name := Name}, Params) -> Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), Position0 = proplists:get_value(<<"position">>, Params, <<"0">>), Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>), - Node = binary_to_existing_atom(Node0), - Position = binary_to_integer(Position0), - Bytes = binary_to_integer(Bytes0), - case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of - {ok, Bin} -> - Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, - {ok, #{meta => Meta, items => Bin}}; - {eof, Size} -> - Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, - {ok, #{meta => Meta, items => <<"">>}}; - {error, Reason} -> - logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), - {error, Reason} + case to_node(Node0) of + {ok, Node} -> + Position = binary_to_integer(Position0), + Bytes = binary_to_integer(Bytes0), + case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + {ok, Bin} -> + Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, + {ok, #{meta => Meta, items => Bin}}; + {eof, Size} -> + Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, + {ok, #{meta => Meta, items => <<"">>}}; + {error, Reason} -> + logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), + {error, Reason}; + {badrpc, nodedown} -> + {error, "BadRpc node down"} + end; + {error, Reason} -> {error, Reason} + end. + +get_trace_size() -> + TraceDir = emqx_trace:trace_dir(), + Node = node(), + case file:list_dir(TraceDir) of + {ok, AllFiles} -> + lists:foldl(fun(File, Acc) -> + FullFileName = filename:join(TraceDir, File), + Acc#{{Node, File} => filelib:file_size(FullFileName)} + end, #{}, lists:delete("zip", AllFiles)); + _ -> #{} end. %% this is an rpc call for stream_log_file/2 @@ -134,7 +173,6 @@ read_trace_file(Name, Position, Limit) -> [] -> {error, not_found} end. --dialyzer({nowarn_function, read_file/3}). read_file(Path, Offset, Bytes) -> {ok, IoDevice} = file:open(Path, [read, raw, binary]), try @@ -146,9 +184,27 @@ read_file(Path, Offset, Bytes) -> {ok, Bin} -> {ok, Bin}; {error, Reason} -> {error, Reason}; eof -> - #file_info{size = Size} = file:read_file_info(IoDevice), + {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), {eof, Size} end after file:close(IoDevice) end. + +to_node(Node) -> + try {ok, binary_to_existing_atom(Node)} + catch _:_ -> + {error, "node not found"} + end. + +collect_file_size(Nodes, FileName, AllFiles) -> + lists:foldl(fun(Node, Acc) -> + Size = maps:get({Node, FileName}, AllFiles, 0), + Acc#{Node => Size} + end, #{}, Nodes). + +%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>; +status(false, _Start, _End, _Now) -> <<"stopped">>; +status(true, Start, _End, Now) when Now < Start -> <<"waiting">>; +status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; +status(true, _Start, _End, _Now) -> <<"running">>. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index 169fd50bc..ffa2bc1fb 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -23,21 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). --record(emqx_trace, { - name, - type, - topic, - clientid, - packets = [], - enable = true, - start_at, - end_at, - log_size = #{} - }). - --define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL' - , 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK' - , 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']). +-record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}). %%-------------------------------------------------------------------- %% Setups @@ -61,16 +47,14 @@ t_base_create_delete(_Config) -> End = to_rfc3339(Now + 30 * 60), Name = <<"name1">>, ClientId = <<"test-device">>, - Packets = [atom_to_binary(E) || E <- ?PACKETS], - Trace = [ - {<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, - {<<"clientid">>, ClientId}, - {<<"packets">>, Packets}, - {<<"start_at">>, Start}, - {<<"end_at">>, End} - ], - AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}), + Trace = #{ + name => Name, + type => <<"clientid">>, + clientid => ClientId, + start_at => Start, + end_at => End + }, + AnotherTrace = Trace#{name => <<"anotherTrace">>}, ok = emqx_trace:create(Trace), ?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)), ?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)), @@ -78,24 +62,19 @@ t_base_create_delete(_Config) -> Expect = #emqx_trace{ name = Name, type = clientid, - topic = undefined, - clientid = ClientId, - packets = ?PACKETS, + filter = ClientId, start_at = Now, end_at = Now + 30 * 60 }, ?assertEqual(Expect, TraceRec), ExpectFormat = [ #{ - clientid => <<"test-device">>, + filter => <<"test-device">>, enable => true, type => clientid, - packets => ?PACKETS, name => <<"name1">>, - start_at => Start, - end_at => End, - log_size => #{}, - topic => undefined + start_at => Now, + end_at => Now + 30 * 60 } ], ?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])), @@ -108,13 +87,12 @@ t_create_size_max(_Config) -> emqx_trace:clear(), lists:map(fun(Seq) -> Name = list_to_binary("name" ++ integer_to_list(Seq)), - Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, - {<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], + Trace = [{name, Name}, {type, <<"topic">>}, + {topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], ok = emqx_trace:create(Trace) end, lists:seq(1, 30)), - Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}], + Trace31 = [{<<"name">>, <<"name31">>}, + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}], {error, _} = emqx_trace:create(Trace31), ok = emqx_trace:delete(<<"name30">>), ok = emqx_trace:create(Trace31), @@ -125,11 +103,11 @@ t_create_failed(_Config) -> ok = emqx_trace:clear(), UnknownField = [{<<"unknown">>, 12}], {error, Reason1} = emqx_trace:create(UnknownField), - ?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)), + ?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)), InvalidTopic = [{<<"topic">>, "#/#//"}], {error, Reason2} = emqx_trace:create(InvalidTopic), - ?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)), + ?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)), InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], {error, Reason3} = emqx_trace:create(InvalidStart), @@ -141,41 +119,34 @@ t_create_failed(_Config) -> ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason4)), - InvalidPackets = [{<<"packets">>, [<<"publish">>]}], - {error, Reason5} = emqx_trace:create(InvalidPackets), - ?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)), - - InvalidPackets2 = [{<<"packets">>, <<"publish">>}], - {error, Reason6} = emqx_trace:create(InvalidPackets2), - ?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)), - {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), - ?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)), + ?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)), InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}], {error, Reason9} = emqx_trace:create(InvalidPackets4), ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), - ?assertEqual({error, "type required"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"packets">>, []}, {<<"clientid">>, <<"good">>}])), + ?assertEqual({error, "type=[topic,clientid,ip_address] required"}, + emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])), - ?assertEqual({error, "incorrect type: only support clientid/topic"}, + ?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), + {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), + + ?assertEqual({error, "ip address: einval"}, + emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])), ok. t_create_default(_Config) -> ok = emqx_trace:clear(), {error, "name required"} = emqx_trace:create([]), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]), - [#emqx_trace{packets = Packets}] = emqx_trace:list(), - ?assertEqual([], Packets), + {<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]), + [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(), ok = emqx_trace:clear(), Trace = [ {<<"name">>, <<"test-name">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, @@ -186,15 +157,13 @@ t_create_default(_Config) -> Trace2 = [ {<<"name">>, <<"test-name">>}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}, {<<"start_at">>, to_rfc3339(Now + 10)}, {<<"end_at">>, to_rfc3339(Now + 3)} ], {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]), + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]), [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(), ?assertEqual(10 * 60, End - Start), ?assertEqual(true, Start - erlang:system_time(second) < 5), @@ -205,8 +174,8 @@ t_update_enable(_Config) -> Name = <<"test-name">>, Now = erlang:system_time(second), End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), - ok = emqx_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]}, - {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), + ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), [#emqx_trace{enable = Enable}] = emqx_trace:list(), ?assertEqual(Enable, true), ok = emqx_trace:update(Name, false), @@ -293,13 +262,10 @@ t_get_log_filename(_Config) -> Start = calendar:system_time_to_rfc3339(Now), End = calendar:system_time_to_rfc3339(Now + 2), Name = <<"name1">>, - ClientId = <<"test-device">>, - Packets = [atom_to_binary(E) || E <- ?PACKETS], Trace = [ {<<"name">>, Name}, - {<<"type">>, <<"clientid">>}, - {<<"clientid">>, ClientId}, - {<<"packets">>, Packets}, + {<<"type">>, <<"ip_address">>}, + {<<"ip_address">>, <<"127.0.0.1">>}, {<<"start_at">>, list_to_binary(Start)}, {<<"end_at">>, list_to_binary(End)} ], diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl index 1daab1520..0b2963af6 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -19,7 +19,7 @@ %% API -export([ list_trace/2 , create_trace/2 - , update_trace/2 + , disable_trace/2 , delete_trace/2 , clear_traces/2 , download_zip_log/2 @@ -52,11 +52,11 @@ func => clear_traces, descr => "clear all traces"}). --rest_api(#{name => update_trace, +-rest_api(#{name => disable_trace, method => 'PUT', - path => "/trace/:bin:name/:atom:operation", - func => update_trace, - descr => "diable/enable trace"}). + path => "/trace/:bin:name/stop", + func => disable_trace, + descr => "stop trace"}). -rest_api(#{name => download_zip_log, method => 'GET', @@ -82,8 +82,8 @@ delete_trace(Path, Params) -> clear_traces(Path, Params) -> return(emqx_trace_api:clear_traces(Path, Params)). -update_trace(Path, Params) -> - return(emqx_trace_api:update_trace(Path, Params)). +disable_trace(#{name := Name}, Params) -> + return(emqx_trace_api:update_trace(#{name => Name, operation => disable}, Params)). download_zip_log(Path, Params) -> case emqx_trace_api:download_zip_log(Path, Params) of diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl index fc786dbd0..36ceb8c49 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl @@ -52,14 +52,13 @@ t_http_test(_Config) -> %% create ErrorTrace = #{}, {ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace), - ?assertEqual(#{<<"message">> => <<"unknown field: {}">>, + ?assertEqual(#{<<"message">> => <<"name required">>, <<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)), Name = <<"test-name">>, Trace = [ {<<"name">>, Name}, {<<"type">>, <<"topic">>}, - {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>} ], @@ -71,14 +70,23 @@ t_http_test(_Config) -> ?assertEqual(Name, maps:get(<<"name">>, Data)), %% update - {ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}), + {ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}), ?assertEqual(#{<<"code">> => 0, <<"data">> => #{<<"enable">> => false, <<"name">> => <<"test-name">>}}, json(Update)), {ok, List1} = request_api(get, api_path("trace"), Header), #{<<"code">> := 0, <<"data">> := [Data1]} = json(List1), - ?assertEqual(false, maps:get(<<"enable">>, Data1)), + Node = atom_to_binary(node()), + ?assertMatch(#{ + <<"status">> := <<"stopped">>, + <<"name">> := <<"test-name">>, + <<"log_size">> := #{Node := _}, + <<"start_at">> := _, + <<"end_at">> := _, + <<"type">> := <<"topic">>, + <<"topic">> := <<"/x/y/z">> + }, Data1), %% delete {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), @@ -86,7 +94,7 @@ t_http_test(_Config) -> {ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header), ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>, - <<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)), + <<"message">> => <<"test-name NOT FOUND">>}, json(DeleteNotFound)), {ok, List2} = request_api(get, api_path("trace"), Header), ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)), diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 14617e2e1..bcf343432 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -126,8 +126,9 @@ subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), - _ = emqx_tracer:trace_subscribe(Topic, SubId, SubOpts), - case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of + _ = emqx_trace:subscribe(Topic, SubId, SubOpts), + SubPid = self(), + case ets:member(?SUBOPTION, {SubPid, Topic}) of false -> %% New ok = emqx_broker_helper:register_sub(SubPid, SubId), do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); @@ -172,7 +173,7 @@ unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), case ets:lookup(?SUBOPTION, {SubPid, Topic}) of [{_, SubOpts}] -> - emqx_tracer:trace_unsubscribe(Topic, SubOpts), + emqx_trace:unsubscribe(Topic, SubOpts), _ = emqx_broker_helper:reclaim_seq(Topic), do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok @@ -195,7 +196,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> -spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> - _ = emqx_tracer:trace_publish(Msg), + _ = emqx_trace:publish(Msg), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 4c49a1cb3..bad053845 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -275,7 +275,7 @@ take_ws_cookie(ClientInfo, ConnInfo) -> handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) -> handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); -handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> +handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> case pipeline([fun enrich_conninfo/2, fun run_conn_hooks/2, fun check_connect/2, @@ -285,6 +285,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> + ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), NChannel1 = NChannel#channel{ will_msg = emqx_packet:will_msg(NConnPkt), alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index a733b0d3a..9cf7050ea 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -18,6 +18,8 @@ -compile({no_auto_import, [error/1]}). +-elvis([{elvis_style, god_modules, disable}]). + %% Logs -export([ debug/1 , debug/2 @@ -64,10 +66,11 @@ id := logger:handler_id(), level := logger:level(), dst := logger_dst(), + filters := [{logger:filter_id(), logger:filter()}], status := started | stopped }). --define(stopped_handlers, {?MODULE, stopped_handlers}). +-define(STOPPED_HANDLERS, {?MODULE, stopped_handlers}). %%-------------------------------------------------------------------- %% APIs @@ -171,19 +174,19 @@ get_log_handlers() -> -spec(get_log_handlers(started | stopped) -> [logger_handler_info()]). get_log_handlers(started) -> - [log_hanlder_info(Conf, started) || Conf <- logger:get_handler_config()]; + [log_handler_info(Conf, started) || Conf <- logger:get_handler_config()]; get_log_handlers(stopped) -> - [log_hanlder_info(Conf, stopped) || Conf <- list_stopped_handler_config()]. + [log_handler_info(Conf, stopped) || Conf <- list_stopped_handler_config()]. -spec(get_log_handler(logger:handler_id()) -> logger_handler_info()). get_log_handler(HandlerId) -> case logger:get_handler_config(HandlerId) of {ok, Conf} -> - log_hanlder_info(Conf, started); + log_handler_info(Conf, started); {error, _} -> case read_stopped_handler_config(HandlerId) of error -> {error, {not_found, HandlerId}}; - {ok, Conf} -> log_hanlder_info(Conf, stopped) + {ok, Conf} -> log_handler_info(Conf, stopped) end end. @@ -245,21 +248,21 @@ parse_transform(AST, _Opts) -> %% Internal Functions %%-------------------------------------------------------------------- -log_hanlder_info(#{id := Id, level := Level, module := logger_std_h, - config := #{type := Type}}, Status) when +log_handler_info(#{id := Id, level := Level, module := logger_std_h, + filters := Filters, config := #{type := Type}}, Status) when Type =:= standard_io; Type =:= standard_error -> - #{id => Id, level => Level, dst => console, status => Status}; -log_hanlder_info(#{id := Id, level := Level, module := logger_std_h, - config := Config = #{type := file}}, Status) -> - #{id => Id, level => Level, status => Status, + #{id => Id, level => Level, dst => console, status => Status, filters => Filters}; +log_handler_info(#{id := Id, level := Level, module := logger_std_h, + filters := Filters, config := Config = #{type := file}}, Status) -> + #{id => Id, level => Level, status => Status, filters => Filters, dst => maps:get(file, Config, atom_to_list(Id))}; -log_hanlder_info(#{id := Id, level := Level, module := logger_disk_log_h, - config := #{file := Filename}}, Status) -> - #{id => Id, level => Level, dst => Filename, status => Status}; -log_hanlder_info(#{id := Id, level := Level, module := _OtherModule}, Status) -> - #{id => Id, level => Level, dst => unknown, status => Status}. +log_handler_info(#{id := Id, level := Level, module := logger_disk_log_h, + filters := Filters, config := #{file := Filename}}, Status) -> + #{id => Id, level => Level, dst => Filename, status => Status, filters => Filters}; +log_handler_info(#{id := Id, level := Level, filters := Filters}, Status) -> + #{id => Id, level => Level, dst => unknown, status => Status, filters => Filters}. %% set level for all log handlers in one command set_all_log_handlers_level(Level) -> @@ -281,29 +284,29 @@ rollback([{ID, Level} | List]) -> rollback([]) -> ok. save_stopped_handler_config(HandlerId, Config) -> - case persistent_term:get(?stopped_handlers, undefined) of + case persistent_term:get(?STOPPED_HANDLERS, undefined) of undefined -> - persistent_term:put(?stopped_handlers, #{HandlerId => Config}); + persistent_term:put(?STOPPED_HANDLERS, #{HandlerId => Config}); ConfList -> - persistent_term:put(?stopped_handlers, ConfList#{HandlerId => Config}) + persistent_term:put(?STOPPED_HANDLERS, ConfList#{HandlerId => Config}) end. read_stopped_handler_config(HandlerId) -> - case persistent_term:get(?stopped_handlers, undefined) of + case persistent_term:get(?STOPPED_HANDLERS, undefined) of undefined -> error; ConfList -> maps:find(HandlerId, ConfList) end. remove_stopped_handler_config(HandlerId) -> - case persistent_term:get(?stopped_handlers, undefined) of + case persistent_term:get(?STOPPED_HANDLERS, undefined) of undefined -> ok; ConfList -> case maps:find(HandlerId, ConfList) of error -> ok; {ok, _} -> - persistent_term:put(?stopped_handlers, maps:remove(HandlerId, ConfList)) + persistent_term:put(?STOPPED_HANDLERS, maps:remove(HandlerId, ConfList)) end end. list_stopped_handler_config() -> - case persistent_term:get(?stopped_handlers, undefined) of + case persistent_term:get(?STOPPED_HANDLERS, undefined) of undefined -> []; ConfList -> maps:values(ConfList) end. diff --git a/src/emqx_trace_handler.erl b/src/emqx_trace_handler.erl new file mode 100644 index 000000000..e4008405a --- /dev/null +++ b/src/emqx_trace_handler.erl @@ -0,0 +1,206 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_trace_handler). + +-include("emqx.hrl"). +-include("logger.hrl"). + +-logger_header("[Tracer]"). + +%% APIs +-export([ running/0 + , install/3 + , install/4 + , uninstall/1 + , uninstall/2 + ]). + +%% For logger handler filters callbacks +-export([ filter_clientid/2 + , filter_topic/2 + , filter_ip_address/2 + ]). + +-type tracer() :: #{ + name := binary(), + type := clientid | topic | ip_address, + filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address() + }. + +-define(FORMAT, + {logger_formatter, #{ + template => [ + time, " [", level, "] ", + {clientid, + [{peername, [clientid, "@", peername, " "], [clientid, " "]}], + [{peername, [peername, " "], []}] + }, + msg, "\n" + ], + single_line => false, + max_size => unlimited, + depth => unlimited + }} +). + +-define(CONFIG(_LogFile_), #{ + type => halt, + file => _LogFile_, + max_no_bytes => 512 * 1024 * 1024, + overload_kill_enable => true, + overload_kill_mem_size => 50 * 1024 * 1024, + overload_kill_qlen => 20000, + %% disable restart + overload_kill_restart_after => infinity + }). + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + +-spec install(Name :: binary() | list(), + Type :: clientid | topic | ip_address, + Filter ::emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string()) -> ok | {error, term()}. +install(Name, Type, Filter, Level, LogFile) -> + Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)}, + install(Who, Level, LogFile). + +-spec install(Type :: clientid | topic | ip_address, + Filter ::emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string()) -> ok | {error, term()}. +install(Type, Filter, Level, LogFile) -> + install(Filter, Type, Filter, Level, LogFile). + +-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}. +install(Who, all, LogFile) -> + install(Who, debug, LogFile); +install(Who, Level, LogFile) -> + PrimaryLevel = emqx_logger:get_primary_log_level(), + try logger:compare_levels(Level, PrimaryLevel) of + lt -> + {error, + io_lib:format( + "Cannot trace at a log level (~s) " + "lower than the primary log level (~s)", + [Level, PrimaryLevel] + )}; + _GtOrEq -> + install_handler(Who, Level, LogFile) + catch + error:badarg -> + {error, {invalid_log_level, Level}} + end. + +-spec uninstall(Type :: clientid | topic | ip_address, + Name :: binary() | list()) -> ok | {error, term()}. +uninstall(Type, Name) -> + HandlerId = handler_id(#{type => Type, name => ensure_bin(Name)}), + uninstall(HandlerId). + +-spec uninstall(HandlerId :: atom()) -> ok | {error, term()}. +uninstall(HandlerId) -> + Res = logger:remove_handler(HandlerId), + show_prompts(Res, HandlerId, "Stop trace"), + Res. + +%% @doc Return all running trace handlers information. +-spec running() -> + [ + #{ + name => binary(), + type => topic | clientid | ip_address, + id => atom(), + filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(), + level => logger:level(), + dst => file:filename() | console | unknown + } + ]. +running() -> + lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). + +-spec filter_clientid(logger:log_event(), string()) -> logger:log_event() | ignore. +filter_clientid(#{meta := #{clientid := ClientId}} = Log, ClientId) -> Log; +filter_clientid(_Log, _ExpectId) -> ignore. + +-spec filter_topic(logger:log_event(), string()) -> logger:log_event() | ignore. +filter_topic(#{meta := #{topic := Topic}} = Log, TopicFilter) -> + case emqx_topic:match(Topic, TopicFilter) of + true -> Log; + false -> ignore + end; +filter_topic(_Log, _ExpectId) -> ignore. + +-spec filter_ip_address(logger:log_event(), string()) -> logger:log_event() | ignore. +filter_ip_address(#{meta := #{peername := Peername}} = Log, IP) -> + case lists:prefix(IP, Peername) of + true -> Log; + false -> ignore + end; +filter_ip_address(_Log, _ExpectId) -> ignore. + +install_handler(Who, Level, LogFile) -> + HandlerId = handler_id(Who), + Config = #{ + level => Level, + formatter => ?FORMAT, + filter_default => stop, + filters => filters(Who), + config => ?CONFIG(LogFile) + }, + Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), + show_prompts(Res, Who, "Start trace"), + Res. + +filters(#{type := clientid, filter := Filter}) -> + [{clientid, {fun ?MODULE:filter_clientid/2, ensure_list(Filter)}}]; +filters(#{type := topic, filter := Filter}) -> + [{topic, {fun ?MODULE:filter_topic/2, ensure_bin(Filter)}}]; +filters(#{type := ip_address, filter := Filter}) -> + [{ip_address, {fun ?MODULE:filter_ip_address/2, ensure_list(Filter)}}]. + +filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) -> + Init = #{id => Id, level => Level, dst => Dst}, + case Filters of + [{topic, {_FilterFun, Filter}}] -> + <<"trace_topic_", Name/binary>> = atom_to_binary(Id), + [Init#{type => topic, filter => Filter, name => Name} | Acc]; + [{clientid, {_FilterFun, Filter}}] -> + <<"trace_clientid_", Name/binary>> = atom_to_binary(Id), + [Init#{type => clientid, filter => Filter, name => Name} | Acc]; + [{ip_address, {_FilterFun, Filter}}] -> + <<"trace_ip_address_", Name/binary>> = atom_to_binary(Id), + [Init#{type => ip_address, filter => Filter, name => Name} | Acc]; + _ -> + Acc + end. + +handler_id(#{type := Type, name := Name}) -> + binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>). + +ensure_bin(List) when is_list(List) -> iolist_to_binary(List); +ensure_bin(Bin) when is_binary(Bin) -> Bin. + +ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin); +ensure_list(List) when is_list(List) -> List. + +show_prompts(ok, Who, Msg) -> + ?LOG(info, Msg ++ " ~p " ++ "successfully~n", [Who]); +show_prompts({error, Reason}, Who, Msg) -> + ?LOG(error, Msg ++ " ~p " ++ "failed by ~p~n", [Who, Reason]). diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl deleted file mode 100644 index e563c8eab..000000000 --- a/src/emqx_tracer.erl +++ /dev/null @@ -1,261 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_tracer). - --include("emqx.hrl"). --include("logger.hrl"). - --logger_header("[Tracer]"). - -%% APIs --export([ trace_publish/1 - , trace_subscribe/3 - , trace_unsubscribe/2 - , start_trace/3 - , start_trace/4 - , lookup_traces/0 - , stop_trace/3 - , stop_trace/2 - ]). - --ifdef(TEST). --export([is_match/3]). --endif. - --type(label() :: 'CONNECT' | 'CONNACK' | 'PUBLISH' | 'PUBACK' | 'PUBREC' | - 'PUBREL' | 'PUBCOMP' | 'SUBSCRIBE' | 'SUBACK' | 'UNSUBSCRIBE' | - 'UNSUBACK' | 'PINGREQ' | 'PINGRESP' | 'DISCONNECT' | 'AUTH'). - --type(tracer() :: #{name := binary(), - type := clientid | topic, - clientid => emqx_types:clientid(), - topic => emqx_types:topic(), - labels := [label()]}). - --define(TRACER, ?MODULE). --define(FORMAT, {logger_formatter, - #{template => - [time, " [", level, "] ", - {clientid, - [{peername, - [clientid, "@", peername, " "], - [clientid, " "]}], - [{peername, - [peername, " "], - []}]}, - msg, "\n"], - single_line => false - }}). --define(TOPIC_COMBINATOR, <<"_trace_topic_">>). --define(CLIENTID_COMBINATOR, <<"_trace_clientid_">>). --define(TOPIC_TRACE_ID(T, N), - binary_to_atom(<<(N)/binary, ?TOPIC_COMBINATOR/binary, (T)/binary>>)). --define(CLIENT_TRACE_ID(C, N), - binary_to_atom(<<(N)/binary, ?CLIENTID_COMBINATOR/binary, (C)/binary>>)). --define(TOPIC_TRACE(T, N, M), {topic, T, N, M}). --define(CLIENT_TRACE(C, N, M), {clientid, C, N, M}). --define(TOPIC_TRACE(T, N), {topic, T, N}). --define(CLIENT_TRACE(C, N), {clientid, C, N}). - --define(IS_LOG_LEVEL(L), - L =:= emergency orelse - L =:= alert orelse - L =:= critical orelse - L =:= error orelse - L =:= warning orelse - L =:= notice orelse - L =:= info orelse - L =:= debug). - -%%------------------------------------------------------------------------------ -%% APIs -%%------------------------------------------------------------------------------ -trace_publish(#message{topic = <<"$SYS/", _/binary>>}) -> - %% Do not trace '$SYS' publish - ignore; -trace_publish(#message{from = From, topic = Topic, payload = Payload}) - when is_binary(From); is_atom(From) -> - emqx_logger:info(#{topic => Topic, - mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, - "PUBLISH to ~s: ~0p", [Topic, Payload]). - -trace_subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; -trace_subscribe(Topic, SubId, SubOpts) -> - emqx_logger:info(#{topic => Topic, - mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts SUBSCRIBE ~ts: Options: ~0p", [SubId, Topic, SubOpts]). - -trace_unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; -trace_unsubscribe(Topic, SubOpts) -> - emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts UNSUBSCRIBE ~ts: Options: ~0p", - [maps:get(subid, SubOpts, ""), Topic, SubOpts]). - --spec(start_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(), - logger:level() | all, string()) -> ok | {error, term()}). -start_trace(clientid, ClientId0, Level, LogFile) -> - ClientId = ensure_bin(ClientId0), - Who = #{type => clientid, clientid => ClientId, name => ClientId, labels => []}, - start_trace(Who, Level, LogFile); -start_trace(topic, Topic0, Level, LogFile) -> - Topic = ensure_bin(Topic0), - Who = #{type => topic, topic => Topic, name => Topic, labels => []}, - start_trace(Who, Level, LogFile). - -%% @doc Start to trace clientid or topic. --spec(start_trace(tracer(), logger:level() | all, string()) -> ok | {error, term()}). -start_trace(Who, all, LogFile) -> - start_trace(Who, debug, LogFile); -start_trace(Who, Level, LogFile) -> - case ?IS_LOG_LEVEL(Level) of - true -> - PrimaryLevel = emqx_logger:get_primary_log_level(), - try logger:compare_levels(Level, PrimaryLevel) of - lt -> - {error, - io_lib:format("Cannot trace at a log level (~s) " - "lower than the primary log level (~s)", - [Level, PrimaryLevel])}; - _GtOrEq -> - install_trace_handler(Who, Level, LogFile) - catch - _:Error -> - {error, Error} - end; - false -> {error, {invalid_log_level, Level}} - end. - --spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic()) -> - ok | {error, term()}). -stop_trace(Type, ClientIdOrTopic) -> - stop_trace(Type, ClientIdOrTopic, ClientIdOrTopic). - -%% @doc Stop tracing clientid or topic. --spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(), binary()) -> - ok | {error, term()}). -stop_trace(clientid, ClientId, Name) -> - Who = #{type => clientid, clientid => ensure_bin(ClientId), name => ensure_bin(Name)}, - uninstall_trance_handler(Who); -stop_trace(topic, Topic, Name) -> - Who = #{type => topic, topic => ensure_bin(Topic), name => ensure_bin(Name)}, - uninstall_trance_handler(Who). - -%% @doc Lookup all traces --spec(lookup_traces() -> [#{ type => topic | clientid, - name => binary(), - topic => emqx_types:topic(), - level => logger:level(), - dst => file:filename() | console | unknown - }]). -lookup_traces() -> - lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). - -install_trace_handler(Who, Level, LogFile) -> - case logger:add_handler(handler_id(Who), logger_disk_log_h, - #{level => Level, - formatter => ?FORMAT, - config => #{type => halt, file => LogFile}, - filter_default => stop, - filters => [{meta_key_filter, - {fun filter_by_meta_key/2, Who}}]}) - of - ok -> - ?LOG(info, "Start trace for ~p", [Who]), - ok; - {error, Reason} -> - ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), - {error, Reason} - end. - -uninstall_trance_handler(Who) -> - case logger:remove_handler(handler_id(Who)) of - ok -> - ?LOG(info, "Stop trace for ~p", [Who]), - ok; - {error, Reason} -> - ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]), - {error, Reason} - end. - -filter_traces(#{id := Id, level := Level, dst := Dst}, Acc) -> - IdStr = atom_to_binary(Id), - case binary:split(IdStr, [?TOPIC_COMBINATOR]) of - [Name, Topic] -> - [#{ type => topic, - name => Name, - topic => Topic, - level => Level, - dst => Dst} | Acc]; - _ -> - case binary:split(IdStr, [?CLIENTID_COMBINATOR]) of - [Name, ClientId] -> - [#{ type => clientid, - name => Name, - clientid => ClientId, - level => Level, - dst => Dst} | Acc]; - _ -> Acc - end - end. - -%% Plan to support topic_and_client type, so we have type field. -handler_id(#{type := topic, topic := Topic, name := Name}) -> - ?TOPIC_TRACE_ID(format(Topic), format(Name)); -handler_id(#{type := clientid, clientid := ClientId, name := Name}) -> - ?CLIENT_TRACE_ID(format(ClientId), format(Name)). - -filter_by_meta_key(#{meta := Meta, level := Level} = Log, Context) -> - case is_match(Context, Meta, Level) of - true -> Log; - false -> ignore - end. - -%% When the log level is higher than debug and clientid/topic is match, -%% it will be logged without judging the content inside the labels. -%% When the log level is debug, in addition to the matched clientid/topic, -%% you also need to determine whether the label is in the labels -is_match(#{type := clientid, clientid := ExpectId, labels := Labels}, - #{clientid := RealId} = Meta, - Level) -> - is_match(ExpectId =:= iolist_to_binary(RealId), Level, Meta, Labels); -is_match(#{type := topic, topic := TopicFilter, labels := Labels}, - #{topic := Topic} = Meta, Level) -> - is_match(emqx_topic:match(Topic, TopicFilter), Level, Meta, Labels); -is_match(_, _, _) -> - false. - -is_match(true, debug, Meta, Labels) -> is_match_labels(Meta, Labels); -is_match(Boolean, _, _Meta, _Labels) -> Boolean. - -is_match_labels(#{trace_label := 'ALL'}, _Context) -> true; -is_match_labels(_, []) -> true; -is_match_labels(#{trace_label := Packet}, Context) -> - lists:member(Packet, Context); -is_match_labels(_, _) -> false. - -format(List)when is_list(List) -> - format(list_to_binary(List)); -format(Atom)when is_atom(Atom) -> - format(atom_to_list(Atom)); -format(Bin0)when is_binary(Bin0) -> - case byte_size(Bin0) of - Size when Size =< 200 -> Bin0; - _ -> emqx_misc:bin2hexstr_a_f_upper(Bin0) - end. - -ensure_bin(List) when is_list(List) -> iolist_to_binary(List); -ensure_bin(Bin) when is_binary(Bin) -> Bin. diff --git a/test/emqx_trace_handler_SUITE.erl b/test/emqx_trace_handler_SUITE.erl new file mode 100644 index 000000000..5e909c45b --- /dev/null +++ b/test/emqx_trace_handler_SUITE.erl @@ -0,0 +1,191 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_trace_handler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). +-define(CLIENT, [{host, "localhost"}, + {clientid, <<"client">>}, + {username, <<"testuser">>}, + {password, <<"pass">>} + ]). + +all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address]. + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +init_per_testcase(t_trace_clientid, Config) -> + Config; +init_per_testcase(_Case, Config) -> + ok = emqx_logger:set_log_level(debug), + _ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()], + Config. + +end_per_testcase(_Case, _Config) -> + ok = emqx_logger:set_log_level(warning), + ok. + +t_trace_clientid(_Config) -> + %% Start tracing + emqx_logger:set_log_level(error), + {error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"), + emqx_logger:set_log_level(debug), + %% add list clientid + ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"), + ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"), + ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"), + {error, {invalid_log_level, bad_level}} = + emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"), + {error, {handler_not_added, {file_error, ".", eisdir}}} = + emqx_trace_handler:install(clientid, <<"client5">>, debug, "."), + ct:sleep(100), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/client.log")), + ?assert(filelib:is_regular("tmp/client2.log")), + ?assert(filelib:is_regular("tmp/client3.log")), + + %% Get current traces + ?assertEqual([#{type => clientid, filter => "client", name => <<"client">>, + id => trace_clientid_client, level => debug, dst => "tmp/client.log"}, + #{type => clientid, filter => "client2", name => <<"client2">>, + id => trace_clientid_client2, level => debug, dst => "tmp/client2.log"}, + #{type => clientid, filter => "client3", name => <<"client3">>, + id => trace_clientid_client3, level => debug, dst => "tmp/client3.log"} + ], emqx_trace_handler:running()), + + %% Client with clientid = "client" publishes a "hi" message to "a/b/c". + {ok, T} = emqtt:start_link(?CLIENT), + emqtt:connect(T), + emqtt:publish(T, <<"a/b/c">>, <<"hi">>), + emqtt:ping(T), + ct:sleep(200), + + %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log". + {ok, Bin} = file:read_file("tmp/client.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])), + ?assert(filelib:file_size("tmp/client2.log") == 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(clientid, <<"client">>), + ok = emqx_trace_handler:uninstall(clientid, <<"client2">>), + ok = emqx_trace_handler:uninstall(clientid, <<"client3">>), + + emqtt:disconnect(T), + ?assertEqual([], emqx_trace_handler:running()). + +t_trace_topic(_Config) -> + {ok, T} = emqtt:start_link(?CLIENT), + emqtt:connect(T), + + %% Start tracing + emqx_logger:set_log_level(debug), + ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), + ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), + ct:sleep(100), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/topic_trace_x.log")), + ?assert(filelib:is_regular("tmp/topic_trace_y.log")), + + %% Get current traces + ?assertEqual([#{type => topic, filter => <<"x/#">>, id => 'trace_topic_x/#', + level => debug, dst => "tmp/topic_trace_x.log", name => <<"x/#">>}, + #{type => topic, filter => <<"y/#">>, id => 'trace_topic_y/#', + name => <<"y/#">>, level => debug, dst => "tmp/topic_trace_y.log"} + ], + emqx_trace_handler:running()), + + %% Client with clientid = "client" publishes a "hi" message to "x/y/z". + emqtt:publish(T, <<"x/y/z">>, <<"hi1">>), + emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), + emqtt:subscribe(T, <<"x/y/z">>), + emqtt:unsubscribe(T, <<"x/y/z">>), + ct:sleep(200), + + {ok, Bin} = file:read_file("tmp/topic_trace_x.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])), + ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(topic, <<"x/#">>), + ok = emqx_trace_handler:uninstall(topic, <<"y/#">>), + {error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>), + ?assertEqual([], emqx_trace_handler:running()), + emqtt:disconnect(T). + +t_trace_ip_address(_Config) -> + {ok, T} = emqtt:start_link(?CLIENT), + emqtt:connect(T), + + %% Start tracing + ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"), + ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"), + ct:sleep(100), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/ip_trace_x.log")), + ?assert(filelib:is_regular("tmp/ip_trace_y.log")), + + %% Get current traces + ?assertEqual([#{type => ip_address, filter => "127.0.0.1", + id => 'trace_ip_address_127.0.0.1', name => <<"127.0.0.1">>, + level => debug, dst => "tmp/ip_trace_x.log"}, + #{type => ip_address, filter => "192.168.1.1", + id => 'trace_ip_address_192.168.1.1', name => <<"192.168.1.1">>, + level => debug, dst => "tmp/ip_trace_y.log"} + ], + emqx_trace_handler:running()), + + %% Client with clientid = "client" publishes a "hi" message to "x/y/z". + emqtt:publish(T, <<"x/y/z">>, <<"hi1">>), + emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), + emqtt:subscribe(T, <<"x/y/z">>), + emqtt:unsubscribe(T, <<"x/y/z">>), + ct:sleep(200), + + {ok, Bin} = file:read_file("tmp/ip_trace_x.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])), + ?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>), + ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>), + {error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>), + emqtt:disconnect(T), + ?assertEqual([], emqx_trace_handler:running()). diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl deleted file mode 100644 index 5f7105774..000000000 --- a/test/emqx_tracer_SUITE.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_tracer_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --include_lib("common_test/include/ct.hrl"). --define(CLIENT, [{host, "localhost"}, - {clientid, <<"client">>}, - {username, <<"testuser">>}, - {password, <<"pass">>} - ]). - -all() -> [t_trace_clientid, t_trace_topic, t_is_match]. - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -t_trace_clientid(_Config) -> - {ok, T} = emqtt:start_link(?CLIENT), - emqtt:connect(T), - - %% Start tracing - emqx_logger:set_log_level(error), - {error, _} = emqx_tracer:start_trace(clientid, <<"client">>, debug, "tmp/client.log"), - emqx_logger:set_log_level(debug), - %% add list clientid - ok = emqx_tracer:start_trace(clientid, "client", debug, "tmp/client.log"), - ok = emqx_tracer:start_trace(clientid, <<"client2">>, all, "tmp/client2.log"), - ok = emqx_tracer:start_trace(clientid, <<"client3">>, all, "tmp/client3.log"), - {error, {invalid_log_level, bad_level}} = - emqx_tracer:start_trace(clientid, <<"client4">>, bad_level, "tmp/client4.log"), - {error, {handler_not_added, {file_error, ".", eisdir}}} = - emqx_tracer:start_trace(clientid, <<"client5">>, debug, "."), - ct:sleep(100), - - %% Verify the tracing file exits - ?assert(filelib:is_regular("tmp/client.log")), - ?assert(filelib:is_regular("tmp/client2.log")), - ?assert(filelib:is_regular("tmp/client3.log")), - - %% Get current traces - ?assertEqual([#{type => clientid, clientid => <<"client">>, - name => <<"client">>, level => debug, dst => "tmp/client.log"}, - #{type => clientid, clientid => <<"client2">>, - name => <<"client2">>, level => debug, dst => "tmp/client2.log"}, - #{type => clientid, clientid => <<"client3">>, - name => <<"client3">>, level => debug, dst => "tmp/client3.log"} - ], emqx_tracer:lookup_traces()), - - %% set the overall log level to debug - emqx_logger:set_log_level(debug), - - %% Client with clientid = "client" publishes a "hi" message to "a/b/c". - emqtt:publish(T, <<"a/b/c">>, <<"hi">>), - ct:sleep(200), - - %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log". - ?assert(filelib:file_size("tmp/client.log") > 0), - ?assert(filelib:file_size("tmp/client2.log") == 0), - - %% Stop tracing - ok = emqx_tracer:stop_trace(clientid, <<"client">>), - ok = emqx_tracer:stop_trace(clientid, <<"client2">>), - ok = emqx_tracer:stop_trace(clientid, <<"client3">>), - emqtt:disconnect(T), - - emqx_logger:set_log_level(warning). - -t_trace_topic(_Config) -> - {ok, T} = emqtt:start_link(?CLIENT), - emqtt:connect(T), - - %% Start tracing - emqx_logger:set_log_level(debug), - ok = emqx_tracer:start_trace(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), - ok = emqx_tracer:start_trace(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), - ct:sleep(100), - - %% Verify the tracing file exits - ?assert(filelib:is_regular("tmp/topic_trace_x.log")), - ?assert(filelib:is_regular("tmp/topic_trace_y.log")), - - %% Get current traces - ?assertEqual([#{type => topic, topic => <<"x/#">>, name => <<"x/#">>, - level => debug, dst => "tmp/topic_trace_x.log"}, - #{type => topic, topic => <<"y/#">>, name => <<"y/#">>, - level => debug, dst => "tmp/topic_trace_y.log"}], - emqx_tracer:lookup_traces()), - - %% set the overall log level to debug - emqx_logger:set_log_level(debug), - - %% Client with clientid = "client" publishes a "hi" message to "x/y/z". - emqtt:publish(T, <<"x/y/z">>, <<"hi1">>), - emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), - ct:sleep(200), - - ?assert(filelib:file_size("tmp/topic_trace_x.log") > 0), - ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0), - - %% Stop tracing - ok = emqx_tracer:stop_trace(topic, <<"x/#">>), - ok = emqx_tracer:stop_trace(topic, <<"y/#">>), - {error, _Reason} = emqx_tracer:stop_trace(topic, <<"z/#">>), - emqtt:disconnect(T), - - emqx_logger:set_log_level(warning). - -t_is_match(_Config) -> - ClientId = <<"test">>, - ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, - #{clientid => ClientId}, warning)), - ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, - #{clientid => ClientId}, debug)), - ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, - labels => ['PUBLISH']}, #{clientid => ClientId}, debug)), - ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, - #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), - ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['PUBLISH']}, - #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), - ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']}, - #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), - ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']}, - #{clientid => ClientId, trace_label => 'ALL'}, debug)), - ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, - #{clientid => <<"Bad">>}, warning)), - ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, - #{clientid => <<"Bad">>, trace_label => 'PUBLISH'}, debug)), - - Topic = <<"/test/#">>, - ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, - #{topic => <<"/test/1">>}, warning)), - ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, - #{topic => <<"/test/1/2">>}, debug)), - ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBSCRIBE']}, - #{topic => <<"/test/1/2">>}, debug)), - ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, - #{topic => <<"/test/3">>, trace_label => 'PUBLISH'}, debug)), - ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']}, - #{topic => <<"/test/398/">>, trace_label => 'PUBLISH'}, debug)), - ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBACK']}, - #{topic => <<"/test/1/xy/y">>, trace_label => 'PUBLISH'}, debug)), - - ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']}, - #{topic => <<"/t1est/398/">>, trace_label => 'PUBLISH'}, debug)), - ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, - #{topic => <<"/t1est/1/xy/y">>, trace_label => 'PUBLISH'}, debug)), - ok.