From d4508a4f1d226a8515bb12e8a7828a9c410a0c0f Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 25 Jun 2024 16:52:48 +0800 Subject: [PATCH 1/7] chore: sync master `elvis.config` --- elvis.config | 131 ++++++++++++++++++++++++++++----------------------- 1 file changed, 73 insertions(+), 58 deletions(-) diff --git a/elvis.config b/elvis.config index 87d739865..5095318e0 100644 --- a/elvis.config +++ b/elvis.config @@ -1,62 +1,77 @@ %% -*- mode: erlang -*- [ - { - elvis, - [ - {config, - [ - #{dirs => ["src", "apps/**/src"], - filter => "*.erl", - ruleset => erl_files, - rules => [ - {elvis_style, macro_names, disable}, - {elvis_style, function_naming_convention, disable}, - {elvis_style, state_record_and_type, disable}, - {elvis_style, no_common_caveats_call, #{}}, - {elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal} - , {ct, print} - ]}}, - {elvis_style, operator_spaces, #{rules => [{right, "|"}, - {left, "|"}, - {right, "||"}, - {left, "||"}]}}, - {elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }}, - {elvis_style, god_modules, #{limit => 100}}, - {elvis_text_style, line_length, #{ limit => 120 % trust erlfmt - , skip_comments => false - }} - ] - }, - #{dirs => ["test", "apps/**/test"], - filter => "*.erl", - rules => [ - {elvis_text_style, line_length, #{ limit => 120 - , skip_comments => false - }}, - {elvis_style, dont_repeat_yourself, #{ min_complexity => 100 }}, - {elvis_style, nesting_level, #{ level => 6 }} - ] - }, - #{dirs => ["apps/emqx_rule_engine/src"], - filter => "*_rule_funcs.erl", - rules => [ - {elvis_style, god_modules, disable} - ] - }, - #{dirs => ["."], - filter => "Makefile", - ruleset => makefiles - }, - #{dirs => ["."], - filter => "rebar.config", - ruleset => rebar_config - }, - #{dirs => ["."], - filter => "elvis.config", - ruleset => elvis_config - } - ] + { + elvis, + [ + {config, [ + #{ + dirs => ["src", "apps/**/src"], + filter => "*.erl", + ruleset => erl_files, + rules => [ + {elvis_style, param_pattern_matching, disable}, + {elvis_style, macro_names, disable}, + {elvis_style, function_naming_convention, disable}, + {elvis_style, state_record_and_type, disable}, + {elvis_style, no_common_caveats_call, #{}}, + {elvis_style, no_debug_call, #{ + debug_functions => [ + {ct, pal}, + {ct, print} + ] + }}, + {elvis_style, operator_spaces, #{ + rules => [ + {right, "|"}, + {left, "|"}, + {right, "||"}, + {left, "||"} + ] + }}, + {elvis_style, dont_repeat_yourself, #{min_complexity => 20}}, + {elvis_style, god_modules, #{limit => 100}}, + % trust erlfmt + {elvis_text_style, line_length, #{ + limit => 120, + skip_comments => false + }} + ] + }, + #{ + dirs => ["test", "apps/**/test"], + filter => "*.erl", + rules => [ + {elvis_text_style, line_length, #{ + limit => 120, + skip_comments => false + }}, + {elvis_style, dont_repeat_yourself, #{min_complexity => 100}}, + {elvis_style, nesting_level, #{level => 6}} + ] + }, + #{ + dirs => ["apps/emqx_rule_engine/src"], + filter => "*_rule_funcs.erl", + rules => [ + {elvis_style, god_modules, disable} + ] + }, + #{ + dirs => ["."], + filter => "Makefile", + ruleset => makefiles + }, + #{ + dirs => ["."], + filter => "rebar.config", + ruleset => rebar_config + }, + #{ + dirs => ["."], + filter => "elvis.config", + ruleset => elvis_config + } + ]} + ] } - ] - } ]. From 6db1c0a446678e18656fde6fd45a7693a905208e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 25 Jun 2024 18:27:09 +0800 Subject: [PATCH 2/7] refactor: separate function to handle `frame_error` --- apps/emqx/src/emqx_channel.erl | 59 +++++++++++++++----------- apps/emqx/src/emqx_frame.erl | 2 +- apps/emqx/src/emqx_quic_connection.erl | 8 ++-- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a27c534e2..f7c76cade 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -563,29 +563,8 @@ handle_in( process_disconnect(ReasonCode, Properties, NChannel); handle_in(?AUTH_PACKET(), Channel) -> handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> - shutdown(shutdown_count(frame_error, Reason), Channel); -handle_in( - {frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting} -) -> - shutdown( - shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel - ); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> - shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); -handle_in( - {frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState} -) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), - {ok, Channel}; +handle_in({frame_error, Reason}, Channel) -> + handle_frame_error(Reason, Channel); handle_in(Packet, Channel) -> ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}), handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). @@ -1017,6 +996,37 @@ not_nacked({deliver, _Topic, Msg}) -> true end. +%%-------------------------------------------------------------------- +%% Handle Frame Error +%%-------------------------------------------------------------------- + +handle_frame_error( + Reason, + Channel = #channel{conn_state = idle} +) -> + shutdown(shutdown_count(frame_error, Reason), Channel); +handle_frame_error( + #{cause := frame_too_large} = R, Channel = #channel{conn_state = connecting} +) -> + shutdown( + shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel + ); +handle_frame_error(Reason, Channel = #channel{conn_state = connecting}) -> + shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); +handle_frame_error( + #{cause := frame_too_large}, Channel = #channel{conn_state = ConnState} +) when + ConnState =:= connected orelse ConnState =:= reauthenticating +-> + handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); +handle_frame_error(Reason, Channel = #channel{conn_state = ConnState}) when + ConnState =:= connected orelse ConnState =:= reauthenticating +-> + handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); +handle_frame_error(Reason, Channel = #channel{conn_state = disconnected}) -> + ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- @@ -2629,8 +2639,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) -> NAliases = maps:put(Topic, AliasId, Aliases), TopicAliases#{outbound => NAliases}. --compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}). - +-compile({inline, [reply/2, shutdown/2, shutdown/3]}). reply(Reply, Channel) -> {reply, Reply, Channel}. diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index a1c9084dd..f83a739ad 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -1134,7 +1134,7 @@ validate_connect_reserved(0) -> ok; validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag). %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] -validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos); +validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index df9520d90..c63014cea 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -62,7 +62,7 @@ streams := [{pid(), quicer:stream_handle()}], %% New stream opts stream_opts := map(), - %% If conneciton is resumed from session ticket + %% If connection is resumed from session ticket is_resumed => boolean(), %% mqtt message serializer config serialize => undefined, @@ -70,8 +70,8 @@ }. -type cb_ret() :: quicer_lib:cb_ret(). -%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked -%% for the activation from control stream after it is accepted as a legit conneciton. +%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked +%% for the activation from control stream after it is accepted as a legit connection. %% For security, the initial number of allowed data streams from client should be limited by %% 'peer_bidi_stream_count` & 'peer_unidi_stream_count` -spec activate_data_streams(pid(), { @@ -80,7 +80,7 @@ activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). -%% @doc conneciton owner init callback +%% @doc connection owner init callback -spec init(map()) -> {ok, cb_state()}. init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); From c313aa89f07c9918c87f6938795020368baca42d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 27 Jun 2024 16:39:11 +0800 Subject: [PATCH 3/7] fix: try throw proto_ver and proto_name when parsing CONNECT packet --- apps/emqx/src/emqx_channel.erl | 16 ++++--- apps/emqx/src/emqx_frame.erl | 78 ++++++++++++++++++++++++---------- 2 files changed, 64 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f7c76cade..f7b210a22 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -145,7 +145,9 @@ -type replies() :: emqx_types:packet() | reply() | [reply()]. -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}). - +-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState), + ((ConnState == connected) orelse (ConnState == reauthenticating)) +). -define(IS_COMMON_SESSION_TIMER(N), ((N == retry_delivery) orelse (N == expire_awaiting_rel)) ). @@ -333,7 +335,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) -> | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}. handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) -> @@ -1016,11 +1018,11 @@ handle_frame_error(Reason, Channel = #channel{conn_state = connecting}) -> handle_frame_error( #{cause := frame_too_large}, Channel = #channel{conn_state = ConnState} ) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); handle_frame_error(Reason, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); handle_frame_error(Reason, Channel = #channel{conn_state = disconnected}) -> @@ -1295,7 +1297,7 @@ handle_info( session = Session } ) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), @@ -2675,13 +2677,13 @@ disconnect_and_shutdown( ?IS_MQTT_V5 = #channel{conn_state = ConnState} ) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); %% mqtt v3/v4 connected sessions disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, Reply, NChannel); diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index f83a739ad..398a5f35c 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -267,27 +267,36 @@ packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. parse_connect(FrameBin, StrictMode) -> - {ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), - case ProtoName of - <<"MQTT">> -> - ok; - <<"MQIsdp">> -> - ok; - _ -> - %% from spec: the server MAY send disconnect with reason code 0x84 - %% we chose to close socket because the client is likely not talking MQTT anyway - ?PARSE_ERR(#{ - cause => invalid_proto_name, - expected => <<"'MQTT' or 'MQIsdp'">>, - received => ProtoName - }) - end, - parse_connect2(ProtoName, Rest, StrictMode). + {ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), + %% No need to parse and check proto_ver if proto_name is invalid, check it first + %% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2` + _ = validate_proto_name(ProtoName), + {IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0), + Meta = #{proto_name => ProtoName, proto_ver => ProtoVer}, + try + do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode) + catch + throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) -> + ?PARSE_ERR(maps:merge(ReasonM, Meta)); + throw:{?FRAME_PARSE_ERROR, Reason} -> + ?PARSE_ERR(Meta#{cause => Reason}) + end. -parse_connect2( +do_parse_connect( ProtoName, - <>, + IsBridge, + ProtoVer, + << + UsernameFlagB:1, + PasswordFlagB:1, + WillRetainB:1, + WillQoS:2, + WillFlagB:1, + CleanStart:1, + Reserved:1, + KeepAlive:16/big, + Rest/binary + >>, StrictMode ) -> _ = validate_connect_reserved(Reserved), @@ -302,14 +311,14 @@ parse_connect2( UsernameFlag = bool(UsernameFlagB), PasswordFlag = bool(PasswordFlagB) ), - {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), + {Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), ConnPacket = #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, %% For bridge mode, non-standard implementation %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html - is_bridge = (BridgeTag =:= 8), + is_bridge = IsBridge, clean_start = bool(CleanStart), will_flag = WillFlag, will_qos = WillQoS, @@ -342,8 +351,8 @@ parse_connect2( unexpected_trailing_bytes => size(Rest7) }) end; -parse_connect2(_ProtoName, Bin, _StrictMode) -> - %% sent less than 32 bytes +do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) -> + %% sent less than 24 bytes ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}). parse_packet( @@ -515,6 +524,12 @@ parse_packet_id(<>) -> parse_packet_id(_) -> ?PARSE_ERR(invalid_packet_id). +parse_connect_proto_ver(<>) -> + {_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest}; +parse_connect_proto_ver(Bin) -> + %% sent less than 1 bytes or empty + ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}). + parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> {#{}, Bin}; %% TODO: version mess? @@ -1129,10 +1144,25 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos); validate_subqos([_ | T]) -> validate_subqos(T); validate_subqos([]) -> ok. +%% from spec: the server MAY send disconnect with reason code 0x84 +%% we chose to close socket because the client is likely not talking MQTT anyway +validate_proto_name(<<"MQTT">>) -> + ok; +validate_proto_name(<<"MQIsdp">>) -> + ok; +validate_proto_name(ProtoName) -> + ?PARSE_ERR(#{ + cause => invalid_proto_name, + expected => <<"'MQTT' or 'MQIsdp'">>, + received => ProtoName + }). + %% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3] +-compile({inline, [validate_connect_reserved/1]}). validate_connect_reserved(0) -> ok; validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag). +-compile({inline, [validate_connect_will/3]}). %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] @@ -1141,6 +1171,7 @@ validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_w validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain); validate_connect_will(_, _, _) -> ok. +-compile({inline, [validate_connect_password_flag/4]}). %% MQTT-v3.1 %% Username flag and password flag are not strongly related %% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect @@ -1155,6 +1186,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) -> validate_connect_password_flag(_, _, _, _) -> ok. +-compile({inline, [bool/1]}). bool(0) -> false; bool(1) -> true. From 37a89d009480294a4c37b2d9446dd8a159063ad5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Jul 2024 11:38:10 +0800 Subject: [PATCH 4/7] fix: enrich parse_state and connection serialize opts --- apps/emqx/include/emqx_mqtt.hrl | 1 + apps/emqx/src/emqx_channel.erl | 78 +++++++++++++++++++++------- apps/emqx/src/emqx_connection.erl | 9 +++- apps/emqx/src/emqx_frame.erl | 27 +++++++--- apps/emqx/src/emqx_ws_connection.erl | 11 +++- 5 files changed, 100 insertions(+), 26 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 09f7495ea..1c3fd770c 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -683,6 +683,7 @@ end). -define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error). + -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f7b210a22..07aad9b24 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -37,6 +37,7 @@ get_mqtt_conf/2, get_mqtt_conf/3, set_conn_state/2, + set_conninfo_proto_ver/2, stats/1, caps/1 ]). @@ -219,6 +220,9 @@ info(impl, #channel{session = Session}) -> set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. +set_conninfo_proto_ver({none, #{version := ProtoVer}}, Channel = #channel{conninfo = ConnInfo}) -> + Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}}. + -spec stats(channel()) -> emqx_types:stats(). stats(#channel{session = undefined}) -> emqx_pd:get_counters(?CHANNEL_METRICS); @@ -1003,29 +1007,60 @@ not_nacked({deliver, _Topic, Msg}) -> %%-------------------------------------------------------------------- handle_frame_error( - Reason, - Channel = #channel{conn_state = idle} -) -> - shutdown(shutdown_count(frame_error, Reason), Channel); -handle_frame_error( - #{cause := frame_too_large} = R, Channel = #channel{conn_state = connecting} -) -> - shutdown( - shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel - ); -handle_frame_error(Reason, Channel = #channel{conn_state = connecting}) -> - shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); -handle_frame_error( - #{cause := frame_too_large}, Channel = #channel{conn_state = ConnState} + Reason = #{cause := frame_too_large}, + Channel = #channel{conn_state = ConnState, conninfo = ConnInfo} ) when ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> - handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); -handle_frame_error(Reason, Channel = #channel{conn_state = ConnState}) when + ShutdownCount = shutdown_count(frame_error, Reason), + case proto_ver(Reason, ConnInfo) of + ?MQTT_PROTO_V5 -> + handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); + _ -> + shutdown(ShutdownCount, Channel) + end; +%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting, +%% otherwise DONOT send any CONNACK or DISCONNECT packet. +handle_frame_error( + Reason, + Channel = #channel{conn_state = ConnState, conninfo = ConnInfo} +) when + is_map(Reason) andalso + (ConnState == idle orelse ConnState == connecting) +-> + ShutdownCount = shutdown_count(frame_error, Reason), + ProtoVer = proto_ver(Reason, ConnInfo), + NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}}, + case ProtoVer of + ?MQTT_PROTO_V5 -> + shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel); + _ -> + shutdown(ShutdownCount, NChannel) + end; +handle_frame_error( + Reason, + Channel = #channel{conn_state = connecting} +) -> + shutdown( + shutdown_count(frame_error, Reason), + ?CONNACK_PACKET(?RC_MALFORMED_PACKET), + Channel + ); +handle_frame_error( + Reason, + Channel = #channel{conn_state = ConnState} +) when ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> - handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); -handle_frame_error(Reason, Channel = #channel{conn_state = disconnected}) -> + handle_out( + disconnect, + {?RC_MALFORMED_PACKET, Reason}, + Channel + ); +handle_frame_error( + Reason, + Channel = #channel{conn_state = disconnected} +) -> ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), {ok, Channel}. @@ -2726,6 +2761,13 @@ is_durable_session(#channel{session = Session}) -> false end. +proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) -> + ProtoVer; +proto_ver(_Reason, #{proto_ver := ProtoVer}) -> + ProtoVer; +proto_ver(_, _) -> + ?MQTT_PROTO_V4. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index f378b700e..ecb962f08 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -782,7 +782,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> input_bytes => Data, parsed_packets => Packets }), - {[{frame_error, Reason} | Packets], State}; + NState = enrich_state(Reason, State), + {[{frame_error, Reason} | Packets], NState}; error:Reason:Stacktrace -> ?LOG(error, #{ at_state => emqx_frame:describe_state(ParseState), @@ -1204,6 +1205,12 @@ inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. +enrich_state(#{parse_state := NParseState}, State) -> + Serialize = emqx_frame:serialize_opts(NParseState), + State#state{parse_state = NParseState, serialize = Serialize}; +enrich_state(_, State) -> + State. + set_tcp_keepalive({quic, _Listener}) -> ok; set_tcp_keepalive({Type, Id}) -> diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 398a5f35c..554847d67 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -266,20 +266,33 @@ packet(Header, Variable) -> packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. -parse_connect(FrameBin, StrictMode) -> +parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) -> {ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), %% No need to parse and check proto_ver if proto_name is invalid, check it first %% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2` _ = validate_proto_name(ProtoName), {IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0), - Meta = #{proto_name => ProtoName, proto_ver => ProtoVer}, + NOptions = Options#{version => ProtoVer}, try do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode) catch throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) -> - ?PARSE_ERR(maps:merge(ReasonM, Meta)); + ?PARSE_ERR( + ReasonM#{ + proto_ver => ProtoVer, + proto_name => ProtoName, + parse_state => ?NONE(NOptions) + } + ); throw:{?FRAME_PARSE_ERROR, Reason} -> - ?PARSE_ERR(Meta#{cause => Reason}) + ?PARSE_ERR( + #{ + cause => Reason, + proto_ver => ProtoVer, + proto_name => ProtoName, + parse_state => ?NONE(NOptions) + } + ) end. do_parse_connect( @@ -358,9 +371,9 @@ do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) -> parse_packet( #mqtt_packet_header{type = ?CONNECT}, FrameBin, - #{strict_mode := StrictMode} + Options ) -> - parse_connect(FrameBin, StrictMode); + parse_connect(FrameBin, Options); parse_packet( #mqtt_packet_header{type = ?CONNACK}, <>, @@ -753,6 +766,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) -> serialize_opts() -> ?DEFAULT_OPTIONS. +serialize_opts(?NONE(Options)) -> + maps:merge(?DEFAULT_OPTIONS, Options); serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), #{version => ProtoVer, max_size => MaxSize}. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 5d04b3304..4765fdace 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) -> %% TODO: should not close the ws connection ?LOG(error, #{msg => "unexpected_frame", frame => Frame}), shutdown(unexpected_ws_frame, State). + websocket_info({call, From, Req}, State) -> handle_call(From, Req, State); websocket_info({cast, rate_limit}, State) -> @@ -725,7 +726,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> input_bytes => Data }), FrameError = {frame_error, Reason}, - {[{incoming, FrameError} | Packets], State}; + NState = enrich_state(Reason, State), + {[{incoming, FrameError} | Packets], NState}; error:Reason:Stacktrace -> ?LOG(error, #{ at_state => emqx_frame:describe_state(ParseState), @@ -1059,6 +1061,13 @@ check_max_connection(Type, Listener) -> {denny, Reason} end end. + +enrich_state(#{parse_state := NParseState}, State) -> + Serialize = emqx_frame:serialize_opts(NParseState), + State#state{parse_state = NParseState, serialize = Serialize}; +enrich_state(_, State) -> + State. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- From 7a251c9ead89c04fda48f22d901f8b3350d50519 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Jul 2024 15:28:39 +0800 Subject: [PATCH 5/7] test: handle frame error for CONNECT packets --- apps/emqx/test/emqx_channel_SUITE.erl | 34 ++++++++++------- apps/emqx/test/emqx_frame_SUITE.erl | 54 ++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 23 deletions(-) diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index d157cc914..83b862892 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -414,24 +414,32 @@ t_handle_in_auth(_) -> emqx_channel:handle_in(?AUTH_PACKET(), Channel). t_handle_in_frame_error(_) -> - IdleChannel = channel(#{conn_state => idle}), - {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} = - emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel), + IdleChannelV5 = channel(#{conn_state => idle}), + %% no CONNACK packet for v4 + ?assertMatch( + {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan}, + emqx_channel:handle_in( + {frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5) + ) + ), + ConnectingChan = channel(#{conn_state => connecting}), ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), - {shutdown, - #{ - shutdown_count := frame_too_large, - cause := frame_too_large, - limit := 100, - received := 101 - }, - ConnackPacket, - _} = + ?assertMatch( + {shutdown, + #{ + shutdown_count := frame_too_large, + cause := frame_too_large, + limit := 100, + received := 101 + }, + ConnackPacket, _}, emqx_channel:handle_in( {frame_error, #{cause => frame_too_large, received => 101, limit => 100}}, ConnectingChan - ), + ) + ), + DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE), ConnectedChan = channel(#{conn_state => connected}), ?assertMatch( diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 9c8a99547..0c5a36231 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -63,6 +63,7 @@ groups() -> t_parse_malformed_properties, t_malformed_connect_header, t_malformed_connect_data, + t_malformed_connect_data_proto_ver, t_reserved_connect_flag, t_invalid_clientid, t_undefined_password, @@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) -> ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). +%% TODO: parse v3 with 0 length clientid + t_serialize_parse_v3_connect(_) -> Bin = <<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117, @@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) -> header = #mqtt_packet_header{type = ?CONNECT}, variable = #mqtt_packet_connect{ clientid = <<"C_00:0C:29:2B:77:52">>, - proto_ver = 16#03, + proto_ver = ?MQTT_PROTO_V3, proto_name = <<"MQIsdp">>, is_bridge = true, will_retain = true, @@ -686,15 +689,36 @@ t_malformed_connect_header(_) -> ). t_malformed_connect_data(_) -> + ProtoNameWithLen = <<0, 6, "MQIsdp">>, + ConnectFlags = <<2#00000000>>, + ClientIdwithLen = <<0, 1, "a">>, + UnexpectedRestBin = <<0, 1, 2>>, ?ASSERT_FRAME_THROW( - #{cause := malformed_connect, unexpected_trailing_bytes := _}, - emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>) + #{cause := malformed_connect, unexpected_trailing_bytes := 3}, + emqx_frame:parse( + <<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0, + ClientIdwithLen/binary, UnexpectedRestBin/binary>> + ) + ). + +t_malformed_connect_data_proto_ver(_) -> + Proto3NameWithLen = <<0, 6, "MQIsdp">>, + ?ASSERT_FRAME_THROW( + #{cause := malformed_connect, header_bytes := <<>>}, + emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>) + ), + ProtoNameWithLen = <<0, 4, "MQTT">>, + ?ASSERT_FRAME_THROW( + #{cause := malformed_connect, header_bytes := <<>>}, + emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>) ). t_reserved_connect_flag(_) -> ?assertException( throw, - {frame_parse_error, reserved_connect_flag}, + {frame_parse_error, #{ + cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">> + }}, emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>) ). @@ -726,7 +750,7 @@ t_undefined_password(_) -> }, variable = #mqtt_packet_connect{ proto_name = <<"MQTT">>, - proto_ver = 4, + proto_ver = ?MQTT_PROTO_V4, is_bridge = false, clean_start = true, will_flag = false, @@ -774,7 +798,9 @@ t_invalid_will_retain(_) -> 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>, ?assertException( throw, - {frame_parse_error, invalid_will_retain}, + {frame_parse_error, #{ + cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBin) ), ok. @@ -796,22 +822,30 @@ t_invalid_will_qos(_) -> ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3)) ), ok. From 15b3f4deb04104704faf26e2f9a43a4ba64e8b75 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Aug 2024 15:00:24 +0800 Subject: [PATCH 6/7] fix: rm unused func and exports --- apps/emqx/src/emqx_channel.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 07aad9b24..6c9c8f8a8 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -37,7 +37,6 @@ get_mqtt_conf/2, get_mqtt_conf/3, set_conn_state/2, - set_conninfo_proto_ver/2, stats/1, caps/1 ]). @@ -220,9 +219,6 @@ info(impl, #channel{session = Session}) -> set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. -set_conninfo_proto_ver({none, #{version := ProtoVer}}, Channel = #channel{conninfo = ConnInfo}) -> - Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}}. - -spec stats(channel()) -> emqx_types:stats(). stats(#channel{session = undefined}) -> emqx_pd:get_counters(?CHANNEL_METRICS); From 4915cc0da6d89823972245fbb65dc47f7270b256 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Aug 2024 15:23:35 +0800 Subject: [PATCH 7/7] chore: add changelog entry for 13357 --- changes/ce/fix-13357.en.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/ce/fix-13357.en.md diff --git a/changes/ce/fix-13357.en.md b/changes/ce/fix-13357.en.md new file mode 100644 index 000000000..ea497a847 --- /dev/null +++ b/changes/ce/fix-13357.en.md @@ -0,0 +1,4 @@ +Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets. + +- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected. +- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.