Merge pull request #2751 from emqx/reliable_tracer

Make emqx_tracer more reliable
This commit is contained in:
turtleDeng 2019-08-16 17:46:27 +08:00 committed by GitHub
commit ffef64a803
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 118 deletions

View File

@ -30,8 +30,7 @@ init([]) ->
child_spec(emqx_stats, worker),
child_spec(emqx_metrics, worker),
child_spec(emqx_ctl, worker),
child_spec(emqx_zone, worker),
child_spec(emqx_tracer, worker)]}}.
child_spec(emqx_zone, worker)]}}.
child_spec(M, worker) ->
#{id => M,

View File

@ -14,34 +14,19 @@
-module(emqx_tracer).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Tracer]").
%% APIs
-export([start_link/0]).
-export([ trace/2
, start_trace/3
, lookup_traces/0
, stop_trace/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {traces}).
-type(trace_who() :: {client_id | topic, binary()}).
-type(trace_who() :: {client_id | topic, binary() | list()}).
-define(TRACER, ?MODULE).
-define(FORMAT, {emqx_logger_formatter,
@ -55,120 +40,100 @@
[peername," "],
[]}]},
msg,"\n"]}}).
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
-define(TOPIC_TRACE(T), {topic,T}).
-define(CLIENT_TRACE(C), {client_id,C}).
-define(is_log_level(L),
L =:= emergency orelse
L =:= alert orelse
L =:= critical orelse
L =:= error orelse
L =:= warning orelse
L =:= notice orelse
L =:= info orelse
L =:= debug).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
%% Dont' trace '$SYS' publish
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
%%------------------------------------------------------------------------------
%% Start/Stop trace
%%------------------------------------------------------------------------------
%% @doc Start to trace client_id or topic.
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
start_trace({client_id, ClientId}, Level, LogFile) ->
do_start_trace({client_id, ClientId}, Level, LogFile);
start_trace({topic, Topic}, Level, LogFile) ->
do_start_trace({topic, Topic}, Level, LogFile).
do_start_trace(Who, Level, LogFile) ->
#{level := PrimaryLevel} = logger:get_primary_config(),
try logger:compare_levels(log_level(Level), PrimaryLevel) of
lt ->
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
_GtOrEq ->
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
catch
_:Error ->
{error, Error}
start_trace(Who, all, LogFile) ->
start_trace(Who, debug, LogFile);
start_trace(Who, Level, LogFile) ->
case ?is_log_level(Level) of
true ->
#{level := PrimaryLevel} = logger:get_primary_config(),
try logger:compare_levels(Level, PrimaryLevel) of
lt ->
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
_GtOrEq ->
install_trace_handler(Who, Level, LogFile)
catch
_:Error ->
{error, Error}
end;
false -> {error, {invalid_log_level, Level}}
end.
%% @doc Stop tracing client_id or topic.
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
stop_trace({client_id, ClientId}) ->
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
stop_trace({topic, Topic}) ->
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
stop_trace(Who) ->
uninstall_trance_handler(Who).
%% @doc Lookup all traces
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
lookup_traces() ->
gen_server:call(?TRACER, lookup_traces).
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
{ok, #state{traces = #{}}}.
handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) ->
install_trace_handler(Who, Level, LogFile) ->
case logger:add_handler(handler_id(Who), logger_disk_log_h,
#{level => Level,
formatter => ?FORMAT,
filesync_repeat_interval => no_repeat,
config => #{type => halt, file => LogFile},
filter_default => stop,
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who} }]}) of
#{level => Level,
formatter => ?FORMAT,
filesync_repeat_interval => no_repeat,
config => #{type => halt, file => LogFile},
filter_default => stop,
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who}}]})
of
ok ->
?LOG(info, "Start trace for ~p", [Who]),
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
?LOG(info, "Start trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
{reply, {error, Reason}, State}
end;
{error, Reason}
end.
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
case maps:find(Who, Traces) of
{ok, _LogFile} ->
case logger:remove_handler(handler_id(Who)) of
ok ->
?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason])
end,
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
error ->
{reply, {error, not_found}, State}
end;
uninstall_trance_handler(Who) ->
case logger:remove_handler(handler_id(Who)) of
ok ->
?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
{error, Reason}
end.
handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
filter_traces({Id, Level, Dst}, Acc) ->
case atom_to_list(Id) of
?TOPIC_TRACE_ID(T)->
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
?CLIENT_TRACE_ID(C) ->
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
_ -> Acc
end.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handler_id({topic, Topic}) ->
list_to_atom("topic_" ++ binary_to_list(Topic));
handler_id({client_id, ClientId}) ->
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
handler_id(?TOPIC_TRACE(Topic)) ->
list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
handler_id(?CLIENT_TRACE(ClientId)) ->
list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
case maps:find(MetaKey, Meta) of
@ -181,13 +146,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
_ -> ignore
end.
log_level(emergency) -> emergency;
log_level(alert) -> alert;
log_level(critical) -> critical;
log_level(error) -> error;
log_level(warning) -> warning;
log_level(notice) -> notice;
log_level(info) -> info;
log_level(debug) -> debug;
log_level(all) -> debug;
log_level(_) -> throw(invalid_log_level).
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Str) when is_list(Str) -> Str.

View File

@ -43,7 +43,7 @@ start_traces(_Config) ->
emqx_logger:set_log_level(debug),
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
{error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
ct:sleep(100),
@ -53,9 +53,9 @@ start_traces(_Config) ->
?assert(filelib:is_regular("tmp/topic_trace.log")),
%% Get current traces
?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
{{client_id,<<"client2">>},{all,"tmp/client2.log"}},
{{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}},
{{client_id,"client2"},{debug,"tmp/client2.log"}},
{{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
%% set the overall log level to debug
emqx_logger:set_log_level(debug),