diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index cfed226cd..3b5a2387b 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -30,8 +30,7 @@ init([]) -> child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), child_spec(emqx_ctl, worker), - child_spec(emqx_zone, worker), - child_spec(emqx_tracer, worker)]}}. + child_spec(emqx_zone, worker)]}}. child_spec(M, worker) -> #{id => M, diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 21562930b..3479e9568 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -14,34 +14,19 @@ -module(emqx_tracer). --behaviour(gen_server). - -include("emqx.hrl"). -include("logger.hrl"). -logger_header("[Tracer]"). %% APIs --export([start_link/0]). - -export([ trace/2 , start_trace/3 , lookup_traces/0 , stop_trace/1 ]). -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --record(state, {traces}). - --type(trace_who() :: {client_id | topic, binary()}). +-type(trace_who() :: {client_id | topic, binary() | list()}). -define(TRACER, ?MODULE). -define(FORMAT, {emqx_logger_formatter, @@ -55,120 +40,100 @@ [peername," "], []}]}, msg,"\n"]}}). +-define(TOPIC_TRACE_ID(T), "trace_topic_"++T). +-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C). +-define(TOPIC_TRACE(T), {topic,T}). +-define(CLIENT_TRACE(C), {client_id,C}). + +-define(is_log_level(L), + L =:= emergency orelse + L =:= alert orelse + L =:= critical orelse + L =:= error orelse + L =:= warning orelse + L =:= notice orelse + L =:= info orelse + L =:= debug). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ - --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). -start_link() -> - gen_server:start_link({local, ?TRACER}, ?MODULE, [], []). - trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> - %% Dont' trace '$SYS' publish + %% Do not trace '$SYS' publish ignore; trace(publish, #message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]). -%%------------------------------------------------------------------------------ -%% Start/Stop trace -%%------------------------------------------------------------------------------ - %% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). -start_trace({client_id, ClientId}, Level, LogFile) -> - do_start_trace({client_id, ClientId}, Level, LogFile); -start_trace({topic, Topic}, Level, LogFile) -> - do_start_trace({topic, Topic}, Level, LogFile). - -do_start_trace(Who, Level, LogFile) -> - #{level := PrimaryLevel} = logger:get_primary_config(), - try logger:compare_levels(log_level(Level), PrimaryLevel) of - lt -> - {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])}; - _GtOrEq -> - gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000) - catch - _:Error -> - {error, Error} +start_trace(Who, all, LogFile) -> + start_trace(Who, debug, LogFile); +start_trace(Who, Level, LogFile) -> + case ?is_log_level(Level) of + true -> + #{level := PrimaryLevel} = logger:get_primary_config(), + try logger:compare_levels(Level, PrimaryLevel) of + lt -> + {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])}; + _GtOrEq -> + install_trace_handler(Who, Level, LogFile) + catch + _:Error -> + {error, Error} + end; + false -> {error, {invalid_log_level, Level}} end. %% @doc Stop tracing client_id or topic. -spec(stop_trace(trace_who()) -> ok | {error, term()}). -stop_trace({client_id, ClientId}) -> - gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}}); -stop_trace({topic, Topic}) -> - gen_server:call(?MODULE, {stop_trace, {topic, Topic}}). +stop_trace(Who) -> + uninstall_trance_handler(Who). %% @doc Lookup all traces -spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]). lookup_traces() -> - gen_server:call(?TRACER, lookup_traces). + lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()). -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([]) -> - {ok, #state{traces = #{}}}. - -handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) -> +install_trace_handler(Who, Level, LogFile) -> case logger:add_handler(handler_id(Who), logger_disk_log_h, - #{level => Level, - formatter => ?FORMAT, - filesync_repeat_interval => no_repeat, - config => #{type => halt, file => LogFile}, - filter_default => stop, - filters => [{meta_key_filter, - {fun filter_by_meta_key/2, Who} }]}) of + #{level => Level, + formatter => ?FORMAT, + filesync_repeat_interval => no_repeat, + config => #{type => halt, file => LogFile}, + filter_default => stop, + filters => [{meta_key_filter, + {fun filter_by_meta_key/2, Who}}]}) + of ok -> - ?LOG(info, "Start trace for ~p", [Who]), - {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; + ?LOG(info, "Start trace for ~p", [Who]); {error, Reason} -> ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), - {reply, {error, Reason}, State} - end; + {error, Reason} + end. -handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> - case maps:find(Who, Traces) of - {ok, _LogFile} -> - case logger:remove_handler(handler_id(Who)) of - ok -> - ?LOG(info, "Stop trace for ~p", [Who]); - {error, Reason} -> - ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]) - end, - {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; - error -> - {reply, {error, not_found}, State} - end; +uninstall_trance_handler(Who) -> + case logger:remove_handler(handler_id(Who)) of + ok -> + ?LOG(info, "Stop trace for ~p", [Who]); + {error, Reason} -> + ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]), + {error, Reason} + end. -handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> - {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; +filter_traces({Id, Level, Dst}, Acc) -> + case atom_to_list(Id) of + ?TOPIC_TRACE_ID(T)-> + [{?TOPIC_TRACE(T), {Level,Dst}} | Acc]; + ?CLIENT_TRACE_ID(C) -> + [{?CLIENT_TRACE(C), {Level,Dst}} | Acc]; + _ -> Acc + end. -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handler_id({topic, Topic}) -> - list_to_atom("topic_" ++ binary_to_list(Topic)); -handler_id({client_id, ClientId}) -> - list_to_atom("clientid_" ++ binary_to_list(ClientId)). +handler_id(?TOPIC_TRACE(Topic)) -> + list_to_atom(?TOPIC_TRACE_ID(str(Topic))); +handler_id(?CLIENT_TRACE(ClientId)) -> + list_to_atom(?CLIENT_TRACE_ID(str(ClientId))). filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> case maps:find(MetaKey, Meta) of @@ -181,13 +146,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> _ -> ignore end. -log_level(emergency) -> emergency; -log_level(alert) -> alert; -log_level(critical) -> critical; -log_level(error) -> error; -log_level(warning) -> warning; -log_level(notice) -> notice; -log_level(info) -> info; -log_level(debug) -> debug; -log_level(all) -> debug; -log_level(_) -> throw(invalid_log_level). +str(Bin) when is_binary(Bin) -> binary_to_list(Bin); +str(Atom) when is_atom(Atom) -> atom_to_list(Atom); +str(Str) when is_list(Str) -> Str. diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 8d516e94d..91149c901 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -43,7 +43,7 @@ start_traces(_Config) -> emqx_logger:set_log_level(debug), ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"), - {error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"), + {error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"), ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"), ct:sleep(100), @@ -53,9 +53,9 @@ start_traces(_Config) -> ?assert(filelib:is_regular("tmp/topic_trace.log")), %% Get current traces - ?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}}, - {{client_id,<<"client2">>},{all,"tmp/client2.log"}}, - {{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()), + ?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}}, + {{client_id,"client2"},{debug,"tmp/client2.log"}}, + {{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()), %% set the overall log level to debug emqx_logger:set_log_level(debug),