Merge pull request #9861 from zhongwencool/log-meta-seq
feat: optimize the log output format
This commit is contained in:
commit
dc82de0eda
|
@ -48,9 +48,9 @@
|
||||||
-define(TRACE(Level, Tag, Msg, Meta), begin
|
-define(TRACE(Level, Tag, Msg, Meta), begin
|
||||||
case persistent_term:get(?TRACE_FILTER, []) of
|
case persistent_term:get(?TRACE_FILTER, []) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
%% We can't bind filter list to a variablebecause we pollute the calling scope with it.
|
%% We can't bind filter list to a variable because we pollute the calling scope with it.
|
||||||
%% We also don't want to wrap the macro body in a fun
|
%% We also don't want to wrap the macro body in a fun
|
||||||
%% beacause this adds overhead to the happy path.
|
%% because this adds overhead to the happy path.
|
||||||
%% So evaluate `persistent_term:get` twice.
|
%% So evaluate `persistent_term:get` twice.
|
||||||
_ -> emqx_trace:log(persistent_term:get(?TRACE_FILTER, []), Msg, (Meta)#{trace_tag => Tag})
|
_ -> emqx_trace:log(persistent_term:get(?TRACE_FILTER, []), Msg, (Meta)#{trace_tag => Tag})
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -152,7 +152,7 @@ start_link() ->
|
||||||
insert_channel_info(ClientId, Info, Stats) ->
|
insert_channel_info(ClientId, Info, Stats) ->
|
||||||
Chan = {ClientId, self()},
|
Chan = {ClientId, self()},
|
||||||
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
|
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
|
||||||
?tp(debug, insert_channel_info, #{client_id => ClientId}),
|
?tp(debug, insert_channel_info, #{clientid => ClientId}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|
|
@ -550,6 +550,7 @@ handle_msg(
|
||||||
},
|
},
|
||||||
handle_incoming(Packet, NState);
|
handle_incoming(Packet, NState);
|
||||||
handle_msg({incoming, Packet}, State) ->
|
handle_msg({incoming, Packet}, State) ->
|
||||||
|
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||||
handle_incoming(Packet, State);
|
handle_incoming(Packet, State);
|
||||||
handle_msg({outgoing, Packets}, State) ->
|
handle_msg({outgoing, Packets}, State) ->
|
||||||
handle_outgoing(Packets, State);
|
handle_outgoing(Packets, State);
|
||||||
|
@ -731,6 +732,12 @@ handle_timeout(TRef, Msg, State) ->
|
||||||
%% Parse incoming data
|
%% Parse incoming data
|
||||||
-compile({inline, [when_bytes_in/3]}).
|
-compile({inline, [when_bytes_in/3]}).
|
||||||
when_bytes_in(Oct, Data, State) ->
|
when_bytes_in(Oct, Data, State) ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "raw_bin_received",
|
||||||
|
size => Oct,
|
||||||
|
bin => binary_to_list(binary:encode_hex(Data)),
|
||||||
|
type => "hex"
|
||||||
|
}),
|
||||||
{Packets, NState} = parse_incoming(Data, [], State),
|
{Packets, NState} = parse_incoming(Data, [], State),
|
||||||
Len = erlang:length(Packets),
|
Len = erlang:length(Packets),
|
||||||
check_limiter(
|
check_limiter(
|
||||||
|
@ -783,7 +790,6 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
|
|
||||||
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
|
|
||||||
with_channel(handle_in, [Packet], State);
|
with_channel(handle_in, [Packet], State);
|
||||||
handle_incoming(FrameError, State) ->
|
handle_incoming(FrameError, State) ->
|
||||||
with_channel(handle_in, [FrameError], State).
|
with_channel(handle_in, [FrameError], State).
|
||||||
|
|
|
@ -22,20 +22,49 @@
|
||||||
|
|
||||||
check_config(X) -> logger_formatter:check_config(X).
|
check_config(X) -> logger_formatter:check_config(X).
|
||||||
|
|
||||||
format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) ->
|
format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) ->
|
||||||
Report1 = enrich_report_mfa(Report0, Meta),
|
Report = enrich_report(ReportMap, Meta),
|
||||||
Report2 = enrich_report_clientid(Report1, Meta),
|
logger_formatter:format(Event#{msg := {report, Report}}, Config);
|
||||||
Report3 = enrich_report_peername(Report2, Meta),
|
|
||||||
Report4 = enrich_report_topic(Report3, Meta),
|
|
||||||
logger_formatter:format(Event#{msg := {report, Report4}}, Config);
|
|
||||||
format(#{msg := {string, String}} = Event, Config) ->
|
format(#{msg := {string, String}} = Event, Config) ->
|
||||||
format(Event#{msg => {"~ts ", [String]}}, Config);
|
format(Event#{msg => {"~ts ", [String]}}, Config);
|
||||||
|
%% trace
|
||||||
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
|
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
|
||||||
Msg1 = enrich_client_info(Msg0, Meta),
|
Msg1 = enrich_client_info(Msg0, Meta),
|
||||||
Msg2 = enrich_mfa(Msg1, Meta),
|
Msg2 = enrich_mfa(Msg1, Meta),
|
||||||
Msg3 = enrich_topic(Msg2, Meta),
|
Msg3 = enrich_topic(Msg2, Meta),
|
||||||
logger_formatter:format(Event#{msg := Msg3}, Config).
|
logger_formatter:format(Event#{msg := Msg3}, Config).
|
||||||
|
|
||||||
|
enrich_report(ReportRaw, Meta) ->
|
||||||
|
%% clientid and peername always in emqx_conn's process metadata.
|
||||||
|
%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
|
||||||
|
Topic =
|
||||||
|
case maps:get(topic, Meta, undefined) of
|
||||||
|
undefined -> maps:get(topic, ReportRaw, undefined);
|
||||||
|
Topic0 -> Topic0
|
||||||
|
end,
|
||||||
|
ClientId = maps:get(clientid, Meta, undefined),
|
||||||
|
Peer = maps:get(peername, Meta, undefined),
|
||||||
|
MFA = maps:get(mfa, Meta, undefined),
|
||||||
|
Line = maps:get(line, Meta, undefined),
|
||||||
|
Msg = maps:get(msg, ReportRaw, undefined),
|
||||||
|
lists:foldl(
|
||||||
|
fun
|
||||||
|
({_, undefined}, Acc) -> Acc;
|
||||||
|
(Item, Acc) -> [Item | Acc]
|
||||||
|
end,
|
||||||
|
maps:to_list(maps:without([topic, msg, clientid], ReportRaw)),
|
||||||
|
[
|
||||||
|
{topic, try_format_unicode(Topic)},
|
||||||
|
{clientid, try_format_unicode(ClientId)},
|
||||||
|
{peername, Peer},
|
||||||
|
{line, Line},
|
||||||
|
{mfa, mfa(MFA)},
|
||||||
|
{msg, Msg}
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
|
try_format_unicode(undefined) ->
|
||||||
|
undefined;
|
||||||
try_format_unicode(Char) ->
|
try_format_unicode(Char) ->
|
||||||
List =
|
List =
|
||||||
try
|
try
|
||||||
|
@ -53,30 +82,6 @@ try_format_unicode(Char) ->
|
||||||
_ -> List
|
_ -> List
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) ->
|
|
||||||
Report#{mfa => mfa(Mfa), line => Line};
|
|
||||||
enrich_report_mfa(Report, _) ->
|
|
||||||
Report.
|
|
||||||
|
|
||||||
enrich_report_clientid(Report, #{clientid := ClientId}) ->
|
|
||||||
Report#{clientid => try_format_unicode(ClientId)};
|
|
||||||
enrich_report_clientid(Report, _) ->
|
|
||||||
Report.
|
|
||||||
|
|
||||||
enrich_report_peername(Report, #{peername := Peername}) ->
|
|
||||||
Report#{peername => Peername};
|
|
||||||
enrich_report_peername(Report, _) ->
|
|
||||||
Report.
|
|
||||||
|
|
||||||
%% clientid and peername always in emqx_conn's process metadata.
|
|
||||||
%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
|
|
||||||
enrich_report_topic(Report, #{topic := Topic}) ->
|
|
||||||
Report#{topic => try_format_unicode(Topic)};
|
|
||||||
enrich_report_topic(Report = #{topic := Topic}, _) ->
|
|
||||||
Report#{topic => try_format_unicode(Topic)};
|
|
||||||
enrich_report_topic(Report, _) ->
|
|
||||||
Report.
|
|
||||||
|
|
||||||
enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
|
enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
|
||||||
{Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]};
|
{Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]};
|
||||||
enrich_mfa(Msg, _) ->
|
enrich_mfa(Msg, _) ->
|
||||||
|
@ -96,4 +101,5 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
|
||||||
enrich_topic(Msg, _) ->
|
enrich_topic(Msg, _) ->
|
||||||
Msg.
|
Msg.
|
||||||
|
|
||||||
mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A).
|
mfa(undefined) -> undefined;
|
||||||
|
mfa({M, F, A}) -> [atom_to_list(M), ":", atom_to_list(F), "/" ++ integer_to_list(A)].
|
||||||
|
|
|
@ -477,9 +477,13 @@ format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
|
||||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
|
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
|
||||||
HeaderIO = format_header(Header),
|
HeaderIO = format_header(Header),
|
||||||
case format_variable(Variable, Payload, PayloadEncode) of
|
case format_variable(Variable, Payload, PayloadEncode) of
|
||||||
"" -> HeaderIO;
|
"" -> [HeaderIO, ")"];
|
||||||
VarIO -> [HeaderIO, ",", VarIO]
|
VarIO -> [HeaderIO, ", ", VarIO, ")"]
|
||||||
end.
|
end;
|
||||||
|
%% receive a frame error packet, such as {frame_error,frame_too_large} or
|
||||||
|
%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}}
|
||||||
|
format(FrameError, _PayloadEncode) ->
|
||||||
|
lists:flatten(io_lib:format("~tp", [FrameError])).
|
||||||
|
|
||||||
format_header(#mqtt_packet_header{
|
format_header(#mqtt_packet_header{
|
||||||
type = Type,
|
type = Type,
|
||||||
|
@ -487,14 +491,14 @@ format_header(#mqtt_packet_header{
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
retain = Retain
|
retain = Retain
|
||||||
}) ->
|
}) ->
|
||||||
io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]).
|
io_lib:format("~ts(Q~p, R~p, D~p", [type_name(Type), QoS, i(Retain), i(Dup)]).
|
||||||
|
|
||||||
format_variable(undefined, _, _) ->
|
format_variable(undefined, _, _) ->
|
||||||
"";
|
"";
|
||||||
format_variable(Variable, undefined, PayloadEncode) ->
|
format_variable(Variable, undefined, PayloadEncode) ->
|
||||||
format_variable(Variable, PayloadEncode);
|
format_variable(Variable, PayloadEncode);
|
||||||
format_variable(Variable, Payload, PayloadEncode) ->
|
format_variable(Variable, Payload, PayloadEncode) ->
|
||||||
[format_variable(Variable, PayloadEncode), ",", format_payload(Payload, PayloadEncode)].
|
[format_variable(Variable, PayloadEncode), ", ", format_payload(Payload, PayloadEncode)].
|
||||||
|
|
||||||
format_variable(
|
format_variable(
|
||||||
#mqtt_packet_connect{
|
#mqtt_packet_connect{
|
||||||
|
|
|
@ -399,6 +399,12 @@ get_peer_info(Type, Listener, Req, Opts) ->
|
||||||
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||||
websocket_handle({binary, Data}, State) ->
|
websocket_handle({binary, Data}, State) ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "raw_bin_received",
|
||||||
|
size => iolist_size(Data),
|
||||||
|
bin => binary_to_list(binary:encode_hex(Data)),
|
||||||
|
type => "hex"
|
||||||
|
}),
|
||||||
State2 = ensure_stats_timer(State),
|
State2 = ensure_stats_timer(State),
|
||||||
{Packets, State3} = parse_incoming(Data, [], State2),
|
{Packets, State3} = parse_incoming(Data, [], State2),
|
||||||
LenMsg = erlang:length(Packets),
|
LenMsg = erlang:length(Packets),
|
||||||
|
@ -437,6 +443,7 @@ websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||||
NState = State#state{serialize = Serialize},
|
NState = State#state{serialize = Serialize},
|
||||||
handle_incoming(Packet, cancel_idle_timer(NState));
|
handle_incoming(Packet, cancel_idle_timer(NState));
|
||||||
websocket_info({incoming, Packet}, State) ->
|
websocket_info({incoming, Packet}, State) ->
|
||||||
|
?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||||
handle_incoming(Packet, State);
|
handle_incoming(Packet, State);
|
||||||
websocket_info({outgoing, Packets}, State) ->
|
websocket_info({outgoing, Packets}, State) ->
|
||||||
return(enqueue(Packets, State));
|
return(enqueue(Packets, State));
|
||||||
|
@ -719,7 +726,6 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when
|
handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when
|
||||||
is_record(Packet, mqtt_packet)
|
is_record(Packet, mqtt_packet)
|
||||||
->
|
->
|
||||||
?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}),
|
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
NState =
|
NState =
|
||||||
case
|
case
|
||||||
|
|
|
@ -237,7 +237,7 @@ do_async_set_keepalive() ->
|
||||||
{ok, _} = ?block_until(
|
{ok, _} = ?block_until(
|
||||||
#{
|
#{
|
||||||
?snk_kind := insert_channel_info,
|
?snk_kind := insert_channel_info,
|
||||||
client_id := ClientID
|
clientid := ClientID
|
||||||
},
|
},
|
||||||
2000,
|
2000,
|
||||||
100
|
100
|
||||||
|
|
Loading…
Reference in New Issue