fix(trace): copy binary:encode_hex/2 from binary.erl

This commit is contained in:
zhongwencool 2021-12-29 22:54:38 +08:00
parent 43141dffee
commit 489fb7f806
19 changed files with 265 additions and 131 deletions

View File

@ -67,13 +67,24 @@
case logger:allow(Level, ?MODULE) of
true ->
logger:log(Level, (Data), (Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
, line => ?LINE
, line => ?LINE
}));
false ->
ok
end).
-define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)).
-define(TRACE_FILTER, emqx_trace_filter).
%% Only evaluate when necessary
-define(TRACE(Event, Msg, Meta),
begin
case persistent_term:get(?TRACE_FILTER, undefined) of
undefined -> ok;
[] -> ok;
List ->
emqx_trace:log(List, Event, Msg, Meta)
end
end).
%% print to 'user' group leader
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).

View File

@ -205,8 +205,8 @@ publish(Msg) when is_record(Msg, message) ->
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
#message{headers = #{allow_publish := false}, topic = Topic} ->
Message = emqx_message:to_log_map(Msg),
?TRACE("MQTT", "msg_publish_not_allowed", #{message => Message, topic => Topic}),
?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg),
topic => Topic}),
[];
Msg1 = #message{topic = Topic} ->
emqx_persistent_session:persist_message(Msg1),

View File

@ -375,7 +375,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
%% this is essentailly a gen_server:call implemented in emqx_connection
%% this is essentially a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
@ -390,19 +390,12 @@ kick_or_kill(Action, ConnMod, Pid) ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
_ : {timeout, {gen_server, call, _}} ->
?tp(warning, "session_kick_timeout",
#{pid => Pid,
action => Action,
stale_channel => stale_channel_info(Pid)
}),
#{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}),
ok = force_kill(Pid);
_ : Error : St ->
?tp(error, "session_kick_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
#{pid => Pid, action => Action, reason => Error, stacktrace => St,
stale_channel => stale_channel_info(Pid)}),
ok = force_kill(Pid)
end.
@ -449,21 +442,21 @@ kick_session(Action, ClientId, ChanPid) ->
, error => Error
, reason => Reason
},
#{clientid => unicode:characters_to_list(ClientId, utf8)})
#{clientid => ClientId})
end.
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] ->
?SLOG(warning, #{msg => "kicked_an_unknown_session"},
#{clientid => unicode:characters_to_list(ClientId, utf8)}),
#{clientid => ClientId}),
ok;
ChanPids ->
case length(ChanPids) > 1 of
true ->
?SLOG(warning, #{msg => "more_than_one_channel_found",
chan_pids => ChanPids},
#{clientid => unicode:characters_to_list(ClientId, utf8)});
#{clientid => ClientId});
false -> ok
end,
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
@ -480,12 +473,12 @@ with_channel(ClientId, Fun) ->
Pids -> Fun(lists:last(Pids))
end.
%% @doc Get all registed channel pids. Debugg/test interface
%% @doc Get all registered channel pids. Debug/test interface
all_channels() ->
Pat = [{{'_', '$1'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
%% @doc Get all registed clientIDs. Debugg/test interface
%% @doc Get all registered clientIDs. Debug/test interface
all_client_ids() ->
Pat = [{{'$1', '_'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
@ -513,7 +506,7 @@ lookup_channels(local, ClientId) ->
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
{badrpc, Reason} ->
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
%% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
%% should catch all exceptions and always return 'ok'.
%% This leaves 'badrpc' only possible when there is problem
%% calling the remote node.

View File

@ -754,7 +754,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
<<>> -> ?SLOG(warning, #{
msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet, null)
packet => emqx_packet:format(Packet, hidden)
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),

View File

@ -121,7 +121,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt,
wind_time_in_ms => WindTime
}, #{clientid => unicode:characters_to_list(ClientId, utf8)}),
}, #{clientid => ClientId}),
Now = erlang:system_time(second),
Banned = #banned{who = {clientid, ClientId},
by = <<"flapping detector">>,
@ -136,7 +136,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt,
interval => Interval
}, #{clientid => unicode:characters_to_list(ClientId, utf8)})
}, #{clientid => ClientId})
end,
{noreply, State};

View File

@ -197,15 +197,7 @@ critical(Metadata, Format, Args) when is_map(Metadata) ->
set_metadata_clientid(<<>>) ->
ok;
set_metadata_clientid(ClientId) ->
try
%% try put string format client-id metadata so
%% so the log is not like <<"...">>
Id = unicode:characters_to_list(ClientId, utf8),
set_proc_metadata(#{clientid => Id})
catch
_: _->
ok
end.
set_proc_metadata(#{clientid => ClientId}).
-spec(set_metadata_peername(peername_str()) -> ok).
set_metadata_peername(Peername) ->

View File

@ -18,22 +18,77 @@
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1]).
check_config(X) -> logger_formatter:check_config(X).
format(#{msg := {report, Report}, meta := Meta} = Event, Config) when is_map(Report) ->
logger_formatter:format(Event#{msg := {report, enrich(Report, Meta)}}, Config);
format(#{msg := Msg, meta := Meta} = Event, Config) ->
NewMsg = enrich_fmt(Msg, Meta),
logger_formatter:format(Event#{msg := NewMsg}, Config).
format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) ->
Report1 = enrich_report_mfa(Report0, Meta),
Report2 = enrich_report_clientid(Report1, Meta),
Report3 = enrich_report_peername(Report2, Meta),
Report4 = enrich_report_topic(Report3, Meta),
logger_formatter:format(Event#{msg := {report, Report4}}, Config);
format(#{msg := {string, String}} = Event, Config) ->
format(Event#{msg => {"~ts ", String}}, Config);
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_mfa(Msg1, Meta),
Msg3 = enrich_topic(Msg2, Meta),
logger_formatter:format(Event#{msg := Msg3}, Config).
enrich(Report, #{mfa := Mfa, line := Line}) ->
try_format_unicode(Char) ->
List =
try
case unicode:characters_to_list(Char) of
{error, _, _} -> error;
{incomplete, _, _} -> error;
Binary -> Binary
end
catch _:_ ->
error
end,
case List of
error -> io_lib:format("~0p", [Char]);
_ -> List
end.
enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) ->
Report#{mfa => mfa(Mfa), line => Line};
enrich(Report, _) -> Report.
enrich_report_mfa(Report, _) -> Report.
enrich_fmt({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
enrich_report_clientid(Report, #{clientid := ClientId}) ->
Report#{clientid => try_format_unicode(ClientId)};
enrich_report_clientid(Report, _) -> Report.
enrich_report_peername(Report, #{peername := Peername}) ->
Report#{peername => Peername};
enrich_report_peername(Report, _) -> Report.
%% clientid and peername always in emqx_conn's process metadata.
%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
enrich_report_topic(Report, #{topic := Topic}) ->
Report#{topic => try_format_unicode(Topic)};
enrich_report_topic(Report = #{topic := Topic}, _) ->
Report#{topic => try_format_unicode(Topic)};
enrich_report_topic(Report, _) -> Report.
enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
{Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]};
enrich_fmt(Msg, _) ->
enrich_mfa(Msg, _) ->
Msg.
enrich_client_info({Fmt, Args}, #{clientid := ClientId, peername := Peer}) when is_list(Fmt) ->
{" ~ts@~ts " ++ Fmt, [ClientId, Peer | Args] };
enrich_client_info({Fmt, Args}, #{clientid := ClientId}) when is_list(Fmt) ->
{" ~ts " ++ Fmt, [ClientId | Args]};
enrich_client_info({Fmt, Args}, #{peername := Peer}) when is_list(Fmt) ->
{" ~ts " ++ Fmt, [Peer | Args]};
enrich_client_info(Msg, _) ->
Msg.
enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
{" topic: ~ts" ++ Fmt, [Topic | Args]};
enrich_topic(Msg, _) ->
Msg.
mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A).

View File

@ -48,6 +48,8 @@
, format/2
]).
-export([encode_hex/1]).
-define(TYPE_NAMES,
{ 'CONNECT'
, 'CONNACK'
@ -440,7 +442,7 @@ will_msg(#mqtt_packet_connect{clientid = ClientId,
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
%% @doc Format packet
-spec(format(emqx_types:packet(), hex | text | null) -> iolist()).
-spec(format(emqx_types:packet(), hex | text | hidden) -> iolist()).
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
HeaderIO = format_header(Header),
case format_variable(Variable, Payload, PayloadEncode) of
@ -504,11 +506,13 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId,
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}, _) ->
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
[io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=",
format_topic_filters(TopicFilters)];
format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
topic_filters = Topics}, _) ->
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
[io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=",
format_topic_filters(Topics)];
format_variable(#mqtt_packet_suback{packet_id = PacketId,
reason_codes = ReasonCodes}, _) ->
@ -527,9 +531,83 @@ format_password(undefined) -> "undefined";
format_password(_Password) -> "******".
format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])];
format_payload(Payload, hex) -> ["Payload(hex)=", binary:encode_hex(Payload)];
format_payload(_, null) -> "Payload=******".
format_payload(Payload, hex) -> ["Payload(hex)=", encode_hex(Payload)];
format_payload(_, hidden) -> "Payload=******".
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
format_topic_filters(Filters) ->
["[",
lists:join(",",
lists:map(
fun({TopicFilter, SubOpts}) ->
io_lib:format("~ts(~p)", [TopicFilter, SubOpts]);
(TopicFilter) ->
io_lib:format("~ts", [TopicFilter])
end, Filters)),
"]"].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Hex encoding functions
%% Copy from binary:encode_hex/1 (was only introduced in OTP24).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-define(HEX(X), (hex(X)):16).
-compile({inline,[hex/1]}).
-spec encode_hex(Bin) -> Bin2 when
Bin :: binary(),
Bin2 :: <<_:_*16>>.
encode_hex(Data) when byte_size(Data) rem 8 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F),?HEX(G),?HEX(H)>> || <<A,B,C,D,E,F,G,H>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 7 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F),?HEX(G)>> || <<A,B,C,D,E,F,G>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 6 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F)>> || <<A,B,C,D,E,F>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 5 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E)>> || <<A,B,C,D,E>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 4 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D)>> || <<A,B,C,D>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 3 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C)>> || <<A,B,C>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 2 =:= 0 ->
<< <<?HEX(A),?HEX(B)>> || <<A,B>> <= Data >>;
encode_hex(Data) when is_binary(Data) ->
<< <<?HEX(N)>> || <<N>> <= Data >>;
encode_hex(Bin) ->
erlang:error(badarg, [Bin]).
hex(X) ->
element(
X+1, {16#3030, 16#3031, 16#3032, 16#3033, 16#3034, 16#3035, 16#3036, 16#3037, 16#3038, 16#3039, 16#3041,
16#3042, 16#3043, 16#3044, 16#3045, 16#3046,
16#3130, 16#3131, 16#3132, 16#3133, 16#3134, 16#3135, 16#3136, 16#3137, 16#3138, 16#3139, 16#3141,
16#3142, 16#3143, 16#3144, 16#3145, 16#3146,
16#3230, 16#3231, 16#3232, 16#3233, 16#3234, 16#3235, 16#3236, 16#3237, 16#3238, 16#3239, 16#3241,
16#3242, 16#3243, 16#3244, 16#3245, 16#3246,
16#3330, 16#3331, 16#3332, 16#3333, 16#3334, 16#3335, 16#3336, 16#3337, 16#3338, 16#3339, 16#3341,
16#3342, 16#3343, 16#3344, 16#3345, 16#3346,
16#3430, 16#3431, 16#3432, 16#3433, 16#3434, 16#3435, 16#3436, 16#3437, 16#3438, 16#3439, 16#3441,
16#3442, 16#3443, 16#3444, 16#3445, 16#3446,
16#3530, 16#3531, 16#3532, 16#3533, 16#3534, 16#3535, 16#3536, 16#3537, 16#3538, 16#3539, 16#3541,
16#3542, 16#3543, 16#3544, 16#3545, 16#3546,
16#3630, 16#3631, 16#3632, 16#3633, 16#3634, 16#3635, 16#3636, 16#3637, 16#3638, 16#3639, 16#3641,
16#3642, 16#3643, 16#3644, 16#3645, 16#3646,
16#3730, 16#3731, 16#3732, 16#3733, 16#3734, 16#3735, 16#3736, 16#3737, 16#3738, 16#3739, 16#3741,
16#3742, 16#3743, 16#3744, 16#3745, 16#3746,
16#3830, 16#3831, 16#3832, 16#3833, 16#3834, 16#3835, 16#3836, 16#3837, 16#3838, 16#3839, 16#3841,
16#3842, 16#3843, 16#3844, 16#3845, 16#3846,
16#3930, 16#3931, 16#3932, 16#3933, 16#3934, 16#3935, 16#3936, 16#3937, 16#3938, 16#3939, 16#3941,
16#3942, 16#3943, 16#3944, 16#3945, 16#3946,
16#4130, 16#4131, 16#4132, 16#4133, 16#4134, 16#4135, 16#4136, 16#4137, 16#4138, 16#4139, 16#4141,
16#4142, 16#4143, 16#4144, 16#4145, 16#4146,
16#4230, 16#4231, 16#4232, 16#4233, 16#4234, 16#4235, 16#4236, 16#4237, 16#4238, 16#4239, 16#4241,
16#4242, 16#4243, 16#4244, 16#4245, 16#4246,
16#4330, 16#4331, 16#4332, 16#4333, 16#4334, 16#4335, 16#4336, 16#4337, 16#4338, 16#4339, 16#4341,
16#4342, 16#4343, 16#4344, 16#4345, 16#4346,
16#4430, 16#4431, 16#4432, 16#4433, 16#4434, 16#4435, 16#4436, 16#4437, 16#4438, 16#4439, 16#4441,
16#4442, 16#4443, 16#4444, 16#4445, 16#4446,
16#4530, 16#4531, 16#4532, 16#4533, 16#4534, 16#4535, 16#4536, 16#4537, 16#4538, 16#4539, 16#4541,
16#4542, 16#4543, 16#4544, 16#4545, 16#4546,
16#4630, 16#4631, 16#4632, 16#4633, 16#4634, 16#4635, 16#4636, 16#4637, 16#4638, 16#4639, 16#4641,
16#4642, 16#4643, 16#4644, 16#4645, 16#4646}).

View File

@ -985,13 +985,13 @@ fields("latency_stats") ->
desc => "the number of smaples for calculate the average latency of delivery"})}
];
fields("trace") ->
[ {"payload_encode", sc(hoconsc:enum([hex, text, null]), #{
[ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{
default => text,
desc => """
Determine the format of the payload format in the trace file.</br>
- `text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.</br>
- `hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.</br>
- `null`: Don't show payload in trace log file.
Determine the format of the payload format in the trace file.<br>
`text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.<br>
`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.<br>
`hidden`: payload is obfuscated as `******`
"""
})}
].

View File

@ -26,7 +26,7 @@
-export([ publish/1
, subscribe/3
, unsubscribe/2
, log/3
, log/4
]).
-export([ start_link/0
@ -52,8 +52,7 @@
-define(TRACE, ?MODULE).
-define(MAX_SIZE, 30).
-define(TRACE_FILTER, emqx_trace_filter).
-define(OWN_KEYS,[level,filters,filter_default,handlers]).
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
-ifdef(TEST).
-export([ log_file/2
@ -94,18 +93,14 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
unsubscribe(Topic, SubOpts) ->
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
log(Event, Msg, Meta0) ->
case persistent_term:get(?TRACE_FILTER, undefined) of
undefined -> ok;
List ->
Meta =
case logger:get_process_metadata() of
undefined -> Meta0;
ProcMeta -> maps:merge(ProcMeta, Meta0)
end,
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
log_filter(List, Log)
end.
log(List, Event, Msg, Meta0) ->
Meta =
case logger:get_process_metadata() of
undefined -> Meta0;
ProcMeta -> maps:merge(ProcMeta, Meta0)
end,
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
log_filter(List, Log).
log_filter([], _Log) -> ok;
log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
@ -196,7 +191,7 @@ update(Name, Enable) ->
transaction(Tran).
check() ->
erlang:send(?MODULE, {mnesia_table_event, check}).
gen_server:call(?MODULE, check).
-spec get_trace_filename(Name :: binary()) ->
{ok, FileName :: string()} | {error, not_found}.
@ -241,6 +236,9 @@ init([]) ->
update_trace_handler(),
{ok, #{timer => TRef, monitors => #{}}}.
handle_call(check, _From, State) ->
{_, NewState} = handle_info({mnesia_table_event, check}, State),
{reply, ok, NewState};
handle_call(Req, _From, State) ->
?SLOG(error, #{unexpected_call => Req}),
{reply, ok, State}.
@ -259,8 +257,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
lists:foreach(fun file:delete/1, Files),
{noreply, State#{monitors => NewMonitors}}
end;
handle_info({timeout, TRef, update_trace},
#{timer := TRef} = State) ->
handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
Traces = get_enable_trace(),
NextTRef = update_trace(Traces),
update_trace_handler(),
@ -344,10 +341,10 @@ disable_finished(Traces) ->
start_trace(Traces, Started0) ->
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
lists:foldl(fun(#?TRACE{name = Name} = Trace,
{Running, StartedAcc}) ->
case lists:member(Name, StartedAcc) of
true ->
{[Name | Running], StartedAcc};
true -> {[Name | Running], StartedAcc};
false ->
case start_trace(Trace) of
ok -> {[Name | Running], [Name | StartedAcc]};
@ -366,9 +363,11 @@ start_trace(Trace) ->
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
stop_trace(Finished, Started) ->
lists:foreach(fun(#{name := Name, type := Type}) ->
lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) ->
case lists:member(Name, Finished) of
true -> emqx_trace_handler:uninstall(Type, Name);
true ->
?TRACE("API", "trace_stopping", #{Type => Filter}),
emqx_trace_handler:uninstall(Type, Name);
false -> ok
end
end, Started).
@ -455,7 +454,7 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
case validate_ip_address(Filter) of
ok ->
Trace0 = maps:without([type, ip_address], Trace),
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter});
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)});
Error -> Error
end;
to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};

View File

@ -25,7 +25,7 @@
format(#{level := trace, event := Event, meta := Meta, msg := Msg},
#{payload_encode := PEncode}) ->
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
ClientId = maps:get(clientid, Meta, ""),
ClientId = to_iolist(maps:get(clientid, Meta, "")),
Peername = maps:get(peername, Meta, ""),
MetaBin = format_meta(Meta, PEncode),
[Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
@ -39,10 +39,7 @@ format_meta(Meta0, Encode) ->
Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0),
case Meta1 =:= #{} of
true -> [Packet, Payload];
false ->
Meta2 = lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
maps:to_list(Meta1)),
[Packet, ", ", lists:join(",", Meta2), Payload]
false -> [Packet, ", ", map_to_iolist(Meta1), Payload]
end.
format_packet(undefined, _) -> "";
@ -50,12 +47,16 @@ format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encod
format_payload(undefined, _) -> "";
format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])];
format_payload(Payload, hex) -> [", payload(hex): ", binary:encode_hex(Payload)];
format_payload(_, null) -> ", payload=******".
format_payload(Payload, hex) -> [", payload(hex): ", emqx_packet:encode_hex(Payload)];
format_payload(_, hidden) -> ", payload=******".
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);
to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]);
to_iolist(Bin)when is_binary(Bin) -> unicode:characters_to_binary(Bin);
to_iolist(List) when is_list(List) -> unicode:characters_to_list(List);
to_iolist(Term) -> io_lib:format("~0p", [Term]).
to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"];
to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char).
map_to_iolist(Map) ->
lists:join(",",
lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
maps:to_list(Map))).

View File

@ -119,11 +119,11 @@ uninstall(HandlerId) ->
running() ->
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
filter_clientid(_Log, _ExpectId) -> stop.
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
case emqx_topic:match(Topic, TopicFilter) of
true -> Log;
@ -140,7 +140,7 @@ filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
filter_ip_address(_Log, _ExpectId) -> stop.
filters(#{type := clientid, filter := Filter, name := Name}) ->
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
[{clientid, {fun ?MODULE:filter_clientid/2, {Filter, Name}}}];
filters(#{type := topic, filter := Filter, name := Name}) ->
[{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
filters(#{type := ip_address, filter := Filter, name := Name}) ->
@ -149,8 +149,9 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
formatter(#{type := _Type}) ->
{emqx_trace_formatter,
#{
template => [],
single_line => false,
%% template is for ?SLOG message not ?TRACE.
template => [time," [",level,"] ", msg,"\n"],
single_line => true,
max_size => unlimited,
depth => unlimited,
payload_encode => payload_encode()

View File

@ -32,19 +32,22 @@ all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([emqx_modules]),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_modules]).
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(t_trace_clientid, Config) ->
init(),
Config;
init_per_testcase(_Case, Config) ->
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
init(),
Config.
end_per_testcase(_Case, _Config) ->
terminate(),
ok.
t_trace_clientid(_Config) ->
@ -66,11 +69,11 @@ t_trace_clientid(_Config) ->
?assert(filelib:is_regular("tmp/client3.log")),
%% Get current traces
?assertMatch([#{type := clientid, filter := "client", name := <<"CLI-client1">>,
?assertMatch([#{type := clientid, filter := <<"client">>, name := <<"CLI-client1">>,
level := debug, dst := "tmp/client.log"},
#{type := clientid, filter := "client2", name := <<"CLI-client2">>
#{type := clientid, filter := <<"client2">>, name := <<"CLI-client2">>
, level := debug, dst := "tmp/client2.log"},
#{type := clientid, filter := "client3", name := <<"CLI-client3">>,
#{type := clientid, filter := <<"client3">>, name := <<"CLI-client3">>,
level := debug, dst := "tmp/client3.log"}
], emqx_trace_handler:running()),
@ -231,3 +234,9 @@ filesync(Name0, Type, Retry) ->
ct:sleep(100),
filesync(Name, Type, Retry - 1)
end.
init() ->
emqx_trace:start_link().
terminate() ->
catch ok = gen_server:stop(emqx_trace, normal, 5000).

View File

@ -43,6 +43,7 @@ groups() ->
[].
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
emqx_authn_test_lib:delete_authenticators(
[?CONF_NS_ATOM],
?GLOBAL),
@ -55,8 +56,9 @@ init_per_testcase(_, Config) ->
Config.
init_per_suite(Config) ->
_ = application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authn, emqx_dashboard],
[emqx_authn, emqx_dashboard],
fun set_special_configs/1),
?AUTHN:delete_chain(?GLOBAL),
@ -65,7 +67,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_authn, emqx_dashboard]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authn]),
ok.
set_special_configs(emqx_dashboard) ->

View File

@ -792,16 +792,7 @@ do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
}};
do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
{emqx_logger_textfmt,
#{template =>
[time," [",level,"] ",
{clientid,
[{peername,
[clientid,"@",peername," "],
[clientid, " "]}],
[{peername,
[peername," "],
[]}]},
msg,"\n"],
#{template => [time," [",level,"] ", msg,"\n"],
chars_limit => CharsLimit,
single_line => SingleLine,
time_offset => TimeOffSet,

View File

@ -88,7 +88,8 @@ on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
{prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
{prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
end,
?TRACE("QUERY", "postgresql_connector_received", #{connector => InstId, command => Command, args => Args, state => State}}),
?TRACE("QUERY", "postgresql_connector_received",
#{connector => InstId, command => Command, args => Args, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
{error, Reason} ->
?SLOG(error, #{

View File

@ -149,7 +149,7 @@ api_key(post, #{body := App}) ->
Desc = unicode:characters_to_binary(Desc0, unicode),
case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
{ok, NewApp} -> {200, format(NewApp)};
{error, Reason} -> {400, Reason}
{error, Reason} -> {400, io_lib:format("~p", [Reason])}
end.
api_key_by_name(get, #{bindings := #{name := Name}}) ->

View File

@ -129,11 +129,10 @@ schema("/trace/:name/log") ->
hoconsc:ref(position),
hoconsc:ref(node)
],
%% todo response data
responses => #{
200 =>
[
{items, hoconsc:mk(binary(), #{example => "BinBinBin"})}
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})}
| fields(bytes) ++ fields(position)
]
}
@ -307,7 +306,7 @@ download_trace_log(get, #{bindings := #{name := Name}}) ->
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips),
Headers = #{
<<"content-type">> => <<"application/octet-stream">>,
<<"content-type">> => <<"application/x-zip">>,
<<"content-disposition">> =>
iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile))
},

View File

@ -18,6 +18,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_mgmt.hrl").
@ -386,19 +387,19 @@ trace(["list"]) ->
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
end, emqx_trace_handler:running());
trace(["stop", Operation, ClientId]) ->
case trace_type(Operation) of
{ok, Type} -> trace_off(Type, ClientId);
trace(["stop", Operation, Filter0]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} -> trace_off(Type, Filter);
error -> trace([])
end;
trace(["start", Operation, ClientId, LogFile]) ->
trace(["start", Operation, ClientId, LogFile, "all"]);
trace(["start", Operation, Filter, LogFile, Level]) ->
case trace_type(Operation) of
{ok, Type} ->
trace_on(name(Filter), Type, Filter,
trace(["start", Operation, Filter0, LogFile, Level]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} ->
trace_on(name(Filter0), Type, Filter,
list_to_existing_atom(Level), LogFile);
error -> trace([])
end;
@ -428,13 +429,14 @@ trace_on(Name, Type, Filter, Level, LogFile) ->
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error])
end.
trace_off(Who, Filter) ->
case emqx_trace_handler:uninstall(Who, name(Filter)) of
trace_off(Type, Filter) ->
?TRACE("CLI", "trace_stopping", #{Type => Filter}),
case emqx_trace_handler:uninstall(Type, name(Filter)) of
ok ->
emqx_trace:check(),
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Filter]);
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]);
{error, Error} ->
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Filter, Error])
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error])
end.
%%--------------------------------------------------------------------
@ -463,9 +465,9 @@ traces(["delete", Name]) ->
traces(["start", Name, Operation, Filter]) ->
traces(["start", Name, Operation, Filter, "900"]);
traces(["start", Name, Operation, Filter, DurationS]) ->
case trace_type(Operation) of
{ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS);
traces(["start", Name, Operation, Filter0, DurationS]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
error -> traces([])
end;
@ -507,10 +509,10 @@ trace_cluster_off(Name) ->
{error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
end.
trace_type("client") -> {ok, clientid};
trace_type("topic") -> {ok, topic};
trace_type("ip_address") -> {ok, ip_address};
trace_type(_) -> error.
trace_type("client", ClientId) -> {ok, clientid, list_to_binary(ClientId)};
trace_type("topic", Topic) -> {ok, topic, list_to_binary(Topic)};
trace_type("ip_address", IP) -> {ok, ip_address, IP};
trace_type(_, _) -> error.
%%--------------------------------------------------------------------
%% @doc Listeners Command