emqx/src/emqx_trace_handler.erl

260 lines
8.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace_handler).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Tracer]").
%% APIs
-export([ running/0
, install/3
, install/4
, uninstall/1
, uninstall/2
]).
%% For logger handler filters callbacks
-export([ filter_clientid/2
, filter_topic/2
, filter_ip_address/2
]).
-export([handler_id/2]).
-type tracer() :: #{
name := binary(),
type := clientid | topic | ip_address,
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
}.
-define(FORMAT,
{logger_formatter, #{
template => [
time, " [", level, "] ",
{clientid,
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
[{peername, [peername, " "], []}]
},
msg, "\n"
],
single_line => false,
max_size => unlimited,
depth => unlimited
}}
).
-define(CLIENT_FORMAT,
{logger_formatter, #{
template => [
time, " [", level, "] ",
{clientid,
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
[{peername, [peername, " "], []}]
},
msg, "\n"
],
single_line => false,
max_size => unlimited,
depth => unlimited
}}
).
-define(CONFIG(_LogFile_), #{
type => halt,
file => _LogFile_,
max_no_bytes => 512 * 1024 * 1024,
overload_kill_enable => true,
overload_kill_mem_size => 50 * 1024 * 1024,
overload_kill_qlen => 20000,
%% disable restart
overload_kill_restart_after => infinity
}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec install(Name :: binary() | list(),
Type :: clientid | topic | ip_address,
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all,
LogFilePath :: string()) -> ok | {error, term()}.
install(Name, Type, Filter, Level, LogFile) ->
Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)},
install(Who, Level, LogFile).
-spec install(Type :: clientid | topic | ip_address,
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all,
LogFilePath :: string()) -> ok | {error, term()}.
install(Type, Filter, Level, LogFile) ->
install(Filter, Type, Filter, Level, LogFile).
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
install(Who, all, LogFile) ->
install(Who, debug, LogFile);
install(Who, Level, LogFile) ->
PrimaryLevel = emqx_logger:get_primary_log_level(),
try logger:compare_levels(Level, PrimaryLevel) of
lt ->
{error,
io_lib:format(
"Cannot trace at a log level (~s) "
"lower than the primary log level (~s)",
[Level, PrimaryLevel]
)};
_GtOrEq ->
install_handler(Who, Level, LogFile)
catch
error:badarg ->
{error, {invalid_log_level, Level}}
end.
-spec uninstall(Type :: clientid | topic | ip_address,
Name :: binary() | list()) -> ok | {error, term()}.
uninstall(Type, Name) ->
HandlerId = handler_id(ensure_bin(Name), Type),
uninstall(HandlerId).
-spec uninstall(HandlerId :: atom()) -> ok | {error, term()}.
uninstall(HandlerId) ->
Res = logger:remove_handler(HandlerId),
show_prompts(Res, HandlerId, "Stop trace"),
Res.
%% @doc Return all running trace handlers information.
-spec running() ->
[
#{
name => binary(),
type => topic | clientid | ip_address,
id => atom(),
filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
level => logger:level(),
dst => file:filename() | console | unknown
}
].
running() ->
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
filter_clientid(_Log, _ExpectId) -> ignore.
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
case emqx_topic:match(Topic, TopicFilter) of
true -> Log;
false -> ignore
end;
filter_topic(_Log, _ExpectId) -> ignore.
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
case lists:prefix(IP, Peername) of
true -> Log;
false -> ignore
end;
filter_ip_address(_Log, _ExpectId) -> ignore.
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
HandlerId = handler_id(Name, Type),
Config = #{ level => Level,
formatter => formatter(Who),
filter_default => stop,
filters => filters(Who),
config => ?CONFIG(LogFile)
},
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
show_prompts(Res, Who, "Start trace"),
Res.
filters(#{type := clientid, filter := Filter, name := Name}) ->
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
filters(#{type := topic, filter := Filter, name := Name}) ->
[{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
filters(#{type := ip_address, filter := Filter, name := Name}) ->
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
formatter(#{type := Type}) ->
{logger_formatter,
#{
template => template(Type),
single_line => false,
max_size => unlimited,
depth => unlimited
}
}.
%% Don't log clientid since clientid only supports exact match, all client ids are the same.
%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp`
%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
template(clientid) ->
[time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
%% TODO better format when clientid is utf8.
template(_) ->
[ time, " [", level, "] ",
{clientid,
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
[{peername, [peername, " "], []}]
},
msg, "\n"
].
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
Init = #{id => Id, level => Level, dst => Dst},
case Filters of
[{Type, {_FilterFun, {Filter, Name}}}] when
Type =:= topic orelse
Type =:= clientid orelse
Type =:= ip_address ->
[Init#{type => Type, filter => Filter, name => Name} | Acc];
_ ->
Acc
end.
handler_id(Name, Type) ->
try
do_handler_id(Name, Type)
catch
_ : _ ->
Hash = emqx_misc:bin2hexstr_a_f_lower(crypto:hash(md5, Name)),
do_handler_id(Hash, Type)
end.
%% Handler ID must be an atom.
do_handler_id(Name, Type) ->
TypeStr = atom_to_list(Type),
NameStr = unicode:characters_to_list(Name, utf8),
FullNameStr = "trace_" ++ TypeStr ++ "_" ++ NameStr,
true = io_lib:printable_unicode_list(FullNameStr),
FullNameBin = unicode:characters_to_binary(FullNameStr, utf8),
binary_to_atom(FullNameBin, utf8).
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
ensure_bin(Bin) when is_binary(Bin) -> Bin.
ensure_list(Bin) when is_binary(Bin) -> unicode:characters_to_list(Bin, utf8);
ensure_list(List) when is_list(List) -> List.
show_prompts(ok, Who, Msg) ->
?LOG(info, Msg ++ " ~p " ++ "successfully~n", [Who]);
show_prompts({error, Reason}, Who, Msg) ->
?LOG(error, Msg ++ " ~p " ++ "failed with ~p~n", [Who, Reason]).