emqx/src/emqx_tracer.erl

154 lines
5.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_tracer).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Tracer]").
%% APIs
-export([ trace/2
, start_trace/3
, lookup_traces/0
, stop_trace/1
]).
-type(trace_who() :: {clientid | topic, binary() | list()}).
-define(TRACER, ?MODULE).
-define(FORMAT, {emqx_logger_formatter,
#{template =>
[time," [",level,"] ",
{clientid,
[{peername,
[clientid,"@",peername," "],
[clientid, " "]}],
[{peername,
[peername," "],
[]}]},
msg,"\n"]}}).
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
-define(TOPIC_TRACE(T), {topic,T}).
-define(CLIENT_TRACE(C), {clientid,C}).
-define(is_log_level(L),
L =:= emergency orelse
L =:= alert orelse
L =:= critical orelse
L =:= error orelse
L =:= warning orelse
L =:= notice orelse
L =:= info orelse
L =:= debug).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]).
%% @doc Start to trace clientid or topic.
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
start_trace(Who, all, LogFile) ->
start_trace(Who, debug, LogFile);
start_trace(Who, Level, LogFile) ->
case ?is_log_level(Level) of
true ->
#{level := PrimaryLevel} = logger:get_primary_config(),
try logger:compare_levels(Level, PrimaryLevel) of
lt ->
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
_GtOrEq ->
install_trace_handler(Who, Level, LogFile)
catch
_:Error ->
{error, Error}
end;
false -> {error, {invalid_log_level, Level}}
end.
%% @doc Stop tracing clientid or topic.
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
stop_trace(Who) ->
uninstall_trance_handler(Who).
%% @doc Lookup all traces
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
lookup_traces() ->
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
install_trace_handler(Who, Level, LogFile) ->
case logger:add_handler(handler_id(Who), logger_disk_log_h,
#{level => Level,
formatter => ?FORMAT,
filesync_repeat_interval => no_repeat,
config => #{type => halt, file => LogFile},
filter_default => stop,
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who}}]})
of
ok ->
?LOG(info, "Start trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
{error, Reason}
end.
uninstall_trance_handler(Who) ->
case logger:remove_handler(handler_id(Who)) of
ok ->
?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
{error, Reason}
end.
filter_traces({Id, Level, Dst}, Acc) ->
case atom_to_list(Id) of
?TOPIC_TRACE_ID(T)->
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
?CLIENT_TRACE_ID(C) ->
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
_ -> Acc
end.
handler_id(?TOPIC_TRACE(Topic)) ->
list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
handler_id(?CLIENT_TRACE(ClientId)) ->
list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
case maps:find(MetaKey, Meta) of
{ok, MetaValue} -> LogEvent;
{ok, Topic} when MetaKey =:= topic ->
case emqx_topic:match(Topic, MetaValue) of
true -> LogEvent;
false -> ignore
end;
_ -> ignore
end.
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Str) when is_list(Str) -> Str.