From c1ff30896a5f0ca04aafc8efddcc25301f3cc92d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 27 Sep 2021 15:02:23 +0800 Subject: [PATCH] fix(frame): safely serializing and structured log. --- apps/emqx/include/emqx_mqtt.hrl | 6 ++++-- apps/emqx/src/emqx_connection.erl | 27 +++++++++++++++++++-------- apps/emqx/src/emqx_frame.erl | 3 ++- apps/emqx/src/emqx_ws_connection.erl | 25 ++++++++++++++++++------- apps/emqx/test/emqx_frame_SUITE.erl | 3 +-- 5 files changed, 44 insertions(+), 20 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 4a64476b8..ae13fcf14 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -542,7 +542,9 @@ -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). -define(IS_SHARE(Topic), case Topic of <> -> true; _ -> false end). --define(FRAME_ERROR(Reason), {frame_error, Reason}). --define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_ERROR(Reason))). +-define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}). +-define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}). +-define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))). +-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))). -endif. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 23ac8d71a..2fe3ea8c9 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -644,7 +644,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet|Packets], NState) catch - throw : ?FRAME_ERROR(Reason) -> + throw : ?FRAME_PARSE_ERROR(Reason) -> ?SLOG(info, #{ reason => Reason , at_state => emqx_frame:describe_state(ParseState) , input_bytes => Data @@ -652,12 +652,12 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> }), {[{frame_error, Reason} | Packets], State}; error : Reason : Stacktrace -> - ?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState) - , input_bytes => Data - , parsed_packets => Packets - , exception => Reason - , stacktrace => Stacktrace - }), + ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + , parsed_packets => Packets + , exception => Reason + , stacktrace => Stacktrace + }), {[{frame_error, Reason} | Packets], State} end. @@ -707,7 +707,7 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case emqx_frame:serialize_pkt(Packet, Serialize) of + try emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), @@ -716,6 +716,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), Data + catch + %% Maybe Never happen. + throw : ?FRAME_SERIALIZE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , input_packet => Packet}), + erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + error : Reason : Stacktrace -> + ?SLOG(error, #{ input_packet => Packet + , exception => Reason + , stacktrace => Stacktrace}), + erlang:raise(error, Reason, Stacktrace) end end. diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index c38d2fd9c..2fe1b6d1a 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -74,6 +74,7 @@ }). -define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)). +-define(SERIALIZE_ERR(Reason), ?THROW_SERIALIZE_ERROR(Reason)). -define(MULTIPLIER_MAX, 16#200000). @@ -781,7 +782,7 @@ serialize_binary_data(Bin) -> [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. serialize_utf8_string(undefined, false) -> - ?PARSE_ERR(utf8_string_undefined); + ?SERIALIZE_ERR(utf8_string_undefined); serialize_utf8_string(undefined, true) -> <<>>; serialize_utf8_string(String, _AllowNull) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index a41026b8e..4c5299b9e 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -547,7 +547,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, postpone({incoming, Packet}, NState)) catch - throw : ?FRAME_ERROR(Reason) -> + throw : ?FRAME_PARSE_ERROR(Reason) -> ?SLOG(info, #{ reason => Reason , at_state => emqx_frame:describe_state(ParseState) , input_bytes => Data @@ -555,11 +555,11 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> FrameError = {frame_error, Reason}, postpone({incoming, FrameError}, State); error : Reason : Stacktrace -> - ?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState) - , input_bytes => Data - , exception => Reason - , stacktrace => Stacktrace - }), + ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + , exception => Reason + , stacktrace => Stacktrace + }), FrameError = {frame_error, Reason}, postpone({incoming, FrameError}, State) end. @@ -627,7 +627,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case emqx_frame:serialize_pkt(Packet, Serialize) of + try emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), @@ -636,6 +636,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), Data + catch + %% Maybe Never happen. + throw : ?FRAME_SERIALIZE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , input_packet => Packet}), + erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + error : Reason : Stacktrace -> + ?SLOG(error, #{ input_packet => Packet + , exception => Reason + , stacktrace => Stacktrace}), + erlang:raise(error, Reason, Stacktrace) end end. diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 4b5cd374b..6d3bccd99 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -24,7 +24,7 @@ -include_lib("common_test/include/ct.hrl"). -define(ASSERT_FRAME_THROW(Reason, Expr), - ?assertThrow(?FRAME_ERROR(Reason), Expr)). + ?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr)). all() -> [{group, parse}, @@ -552,4 +552,3 @@ parse_to_packet(Bin, Opts) -> Packet. payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). -