From 6cbad047cd1283f0c4b50975c28aaf1e063abde2 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 12:16:17 +0800 Subject: [PATCH 1/6] fix: don't log CONNECT twice when debug --- apps/emqx/src/emqx_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 5ed302a6f..714b077ca 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -550,6 +550,7 @@ handle_msg( }, handle_incoming(Packet, NState); handle_msg({incoming, Packet}, State) -> + ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), handle_incoming(Packet, State); handle_msg({outgoing, Packets}, State) -> handle_outgoing(Packets, State); @@ -783,7 +784,6 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> with_channel(handle_in, [FrameError], State). From ce32ea7334da3cdb925d0c556f3e7d1a6dca98dc Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 12:18:22 +0800 Subject: [PATCH 2/6] feat: Make the log output format order fixed --- apps/emqx/include/logger.hrl | 4 +- apps/emqx/src/emqx_cm.erl | 2 +- apps/emqx/src/emqx_logger_textfmt.erl | 65 ++++++++++++++------------- apps/emqx/test/emqx_mqtt_SUITE.erl | 2 +- 4 files changed, 39 insertions(+), 34 deletions(-) diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index e93aa46f4..27ffc6cc0 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -48,9 +48,9 @@ -define(TRACE(Level, Tag, Msg, Meta), begin case persistent_term:get(?TRACE_FILTER, []) of [] -> ok; - %% We can't bind filter list to a variablebecause we pollute the calling scope with it. + %% We can't bind filter list to a variable because we pollute the calling scope with it. %% We also don't want to wrap the macro body in a fun - %% beacause this adds overhead to the happy path. + %% because this adds overhead to the happy path. %% So evaluate `persistent_term:get` twice. _ -> emqx_trace:log(persistent_term:get(?TRACE_FILTER, []), Msg, (Meta)#{trace_tag => Tag}) end, diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 66e9a2aee..77bc44eeb 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -152,7 +152,7 @@ start_link() -> insert_channel_info(ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), - ?tp(debug, insert_channel_info, #{client_id => ClientId}), + ?tp(debug, insert_channel_info, #{clientid => ClientId}), ok. %% @private diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index 3695929d9..c1d85f341 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -22,20 +22,49 @@ check_config(X) -> logger_formatter:check_config(X). -format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) -> - Report1 = enrich_report_mfa(Report0, Meta), - Report2 = enrich_report_clientid(Report1, Meta), - Report3 = enrich_report_peername(Report2, Meta), - Report4 = enrich_report_topic(Report3, Meta), - logger_formatter:format(Event#{msg := {report, Report4}}, Config); +format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) -> + Report = enrich_report(ReportMap, Meta), + logger_formatter:format(Event#{msg := {report, Report}}, Config); format(#{msg := {string, String}} = Event, Config) -> format(Event#{msg => {"~ts ", [String]}}, Config); +%% trace format(#{msg := Msg0, meta := Meta} = Event, Config) -> Msg1 = enrich_client_info(Msg0, Meta), Msg2 = enrich_mfa(Msg1, Meta), Msg3 = enrich_topic(Msg2, Meta), logger_formatter:format(Event#{msg := Msg3}, Config). +enrich_report(ReportRaw, Meta) -> + %% clientid and peername always in emqx_conn's process metadata. + %% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2 + Topic = + case maps:get(topic, Meta, undefined) of + undefined -> maps:get(topic, ReportRaw, undefined); + Topic0 -> Topic0 + end, + ClientId = maps:get(clientid, Meta, undefined), + Peer = maps:get(peername, Meta, undefined), + MFA = maps:get(mfa, Meta, undefined), + Line = maps:get(line, Meta, undefined), + Msg = maps:get(msg, ReportRaw, undefined), + lists:foldl( + fun + ({_, undefined}, Acc) -> Acc; + (Item, Acc) -> [Item | Acc] + end, + maps:to_list(maps:without([topic, msg, clientid], ReportRaw)), + [ + {topic, try_format_unicode(Topic)}, + {clientid, try_format_unicode(ClientId)}, + {peername, Peer}, + {line, Line}, + {mfa, mfa(MFA)}, + {msg, Msg} + ] + ). + +try_format_unicode(undefined) -> + undefined; try_format_unicode(Char) -> List = try @@ -53,30 +82,6 @@ try_format_unicode(Char) -> _ -> List end. -enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) -> - Report#{mfa => mfa(Mfa), line => Line}; -enrich_report_mfa(Report, _) -> - Report. - -enrich_report_clientid(Report, #{clientid := ClientId}) -> - Report#{clientid => try_format_unicode(ClientId)}; -enrich_report_clientid(Report, _) -> - Report. - -enrich_report_peername(Report, #{peername := Peername}) -> - Report#{peername => Peername}; -enrich_report_peername(Report, _) -> - Report. - -%% clientid and peername always in emqx_conn's process metadata. -%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2 -enrich_report_topic(Report, #{topic := Topic}) -> - Report#{topic => try_format_unicode(Topic)}; -enrich_report_topic(Report = #{topic := Topic}, _) -> - Report#{topic => try_format_unicode(Topic)}; -enrich_report_topic(Report, _) -> - Report. - enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> {Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]}; enrich_mfa(Msg, _) -> diff --git a/apps/emqx/test/emqx_mqtt_SUITE.erl b/apps/emqx/test/emqx_mqtt_SUITE.erl index 287d7fdba..d0162b34b 100644 --- a/apps/emqx/test/emqx_mqtt_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_SUITE.erl @@ -237,7 +237,7 @@ do_async_set_keepalive() -> {ok, _} = ?block_until( #{ ?snk_kind := insert_channel_info, - client_id := ClientID + clientid := ClientID }, 2000, 100 From 25090563afe5715e70c0b730c0bcfd915ba5a399 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 12:28:51 +0800 Subject: [PATCH 3/6] chore: use brackets to wrap the mqtt packet when logging --- apps/emqx/src/emqx_packet.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index c247419f0..9fa0b00da 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -477,8 +477,8 @@ format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()). format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) -> HeaderIO = format_header(Header), case format_variable(Variable, Payload, PayloadEncode) of - "" -> HeaderIO; - VarIO -> [HeaderIO, ",", VarIO] + "" -> [HeaderIO, ")"]; + VarIO -> [HeaderIO, ", ", VarIO, ")"] end. format_header(#mqtt_packet_header{ @@ -487,14 +487,14 @@ format_header(#mqtt_packet_header{ qos = QoS, retain = Retain }) -> - io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]). + io_lib:format("~ts(Q~p, R~p, D~p", [type_name(Type), QoS, i(Retain), i(Dup)]). format_variable(undefined, _, _) -> ""; format_variable(Variable, undefined, PayloadEncode) -> format_variable(Variable, PayloadEncode); format_variable(Variable, Payload, PayloadEncode) -> - [format_variable(Variable, PayloadEncode), ",", format_payload(Payload, PayloadEncode)]. + [format_variable(Variable, PayloadEncode), ", ", format_payload(Payload, PayloadEncode)]. format_variable( #mqtt_packet_connect{ From 3d07271ea50bab0fdbfffa8193a2d41e512fec14 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 14:30:20 +0800 Subject: [PATCH 4/6] fix: crash when mfa not found --- apps/emqx/src/emqx_logger_textfmt.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index c1d85f341..fe67153ec 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -101,4 +101,5 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) -> enrich_topic(Msg, _) -> Msg. +mfa(undefined) -> undefined; mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). From b73d11675e754ea082d208e17a567541792b633e Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 15:05:49 +0800 Subject: [PATCH 5/6] chore: log the bad mqtt packet(frame error) --- apps/emqx/src/emqx_logger_textfmt.erl | 2 +- apps/emqx/src/emqx_packet.erl | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index fe67153ec..fb27681b8 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -102,4 +102,4 @@ enrich_topic(Msg, _) -> Msg. mfa(undefined) -> undefined; -mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). +mfa({M, F, A}) -> [atom_to_list(M), ":", atom_to_list(F), "/" ++ integer_to_list(A)]. diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 9fa0b00da..32bd3df53 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -479,7 +479,11 @@ format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Pa case format_variable(Variable, Payload, PayloadEncode) of "" -> [HeaderIO, ")"]; VarIO -> [HeaderIO, ", ", VarIO, ")"] - end. + end; +%% receive a frame error packet, such as {frame_error,frame_too_large} or +%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}} +format(FrameError, _PayloadEncode) -> + lists:flatten(io_lib:format("~tp", [FrameError])). format_header(#mqtt_packet_header{ type = Type, From bb636394e19f9b3a86a7c335caa52e449a996ab1 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 16:35:06 +0800 Subject: [PATCH 6/6] chore: add debug log for raw data --- apps/emqx/src/emqx_connection.erl | 6 ++++++ apps/emqx/src/emqx_ws_connection.erl | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 714b077ca..5b783f2fe 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -732,6 +732,12 @@ handle_timeout(TRef, Msg, State) -> %% Parse incoming data -compile({inline, [when_bytes_in/3]}). when_bytes_in(Oct, Data, State) -> + ?SLOG(debug, #{ + msg => "raw_bin_received", + size => Oct, + bin => binary_to_list(binary:encode_hex(Data)), + type => "hex" + }), {Packets, NState} = parse_incoming(Data, [], State), Len = erlang:length(Packets), check_limiter( diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 817c4b505..ead609ed8 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -399,6 +399,12 @@ get_peer_info(Type, Listener, Req, Opts) -> websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, Data}, State) -> + ?SLOG(debug, #{ + msg => "raw_bin_received", + size => iolist_size(Data), + bin => binary_to_list(binary:encode_hex(Data)), + type => "hex" + }), State2 = ensure_stats_timer(State), {Packets, State3} = parse_incoming(Data, [], State2), LenMsg = erlang:length(Packets), @@ -437,6 +443,7 @@ websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> NState = State#state{serialize = Serialize}, handle_incoming(Packet, cancel_idle_timer(NState)); websocket_info({incoming, Packet}, State) -> + ?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}), handle_incoming(Packet, State); websocket_info({outgoing, Packets}, State) -> return(enqueue(Packets, State)); @@ -719,7 +726,6 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> - ?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}), ok = inc_incoming_stats(Packet), NState = case