diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index e0baab238..f115ad216 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -346,7 +346,7 @@ init_state( max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_opts(), + Serialize = emqx_frame:initial_serialize_opts(FrameOpts), %% Init Channel Channel = emqx_channel:init(ConnInfo, Opts), GcState = diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 0b02ad1f5..661fc7861 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -29,11 +29,12 @@ parse/2, serialize_fun/0, serialize_fun/1, - serialize_opts/0, + initial_serialize_opts/1, serialize_opts/1, serialize_pkt/2, serialize/1, - serialize/2 + serialize/2, + serialize/3 ]). -export([describe_state/1]). @@ -84,7 +85,7 @@ -define(MULTIPLIER_MAX, 16#200000). --dialyzer({no_match, [serialize_utf8_string/2]}). +-dialyzer({no_match, [serialize_utf8_string/3]}). %% @doc Describe state for logging. describe_state(?NONE(_Opts)) -> @@ -718,43 +719,51 @@ serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS). serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), - serialize_fun(#{version => ProtoVer, max_size => MaxSize}); -serialize_fun(#{version := Ver, max_size := MaxSize}) -> + serialize_fun(#{version => ProtoVer, max_size => MaxSize, strict_mode => false}); +serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode}) -> fun(Packet) -> - IoData = serialize(Packet, Ver), + IoData = serialize(Packet, Ver, StrictMode), case is_too_large(IoData, MaxSize) of true -> <<>>; false -> IoData end end. -serialize_opts() -> - ?DEFAULT_OPTIONS. +initial_serialize_opts(Opts) -> + maps:merge(?DEFAULT_OPTIONS, Opts). serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), - #{version => ProtoVer, max_size => MaxSize}. + #{version => ProtoVer, max_size => MaxSize, strict_mode => false}. -serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) -> - IoData = serialize(Packet, Ver), +serialize_pkt(Packet, #{version := Ver, max_size := MaxSize, strict_mode := StrictMode}) -> + IoData = serialize(Packet, Ver, StrictMode), case is_too_large(IoData, MaxSize) of true -> <<>>; false -> IoData end. -spec serialize(emqx_types:packet()) -> iodata(). -serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). +serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4, false). --spec serialize(emqx_types:packet(), emqx_types:proto_ver()) -> iodata(). +serialize(Packet, Ver) -> serialize(Packet, Ver, false). + +-spec serialize(emqx_types:packet(), emqx_types:proto_ver(), boolean()) -> iodata(). serialize( #mqtt_packet{ header = Header, variable = Variable, payload = Payload }, - Ver + Ver, + StrictMode ) -> - serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)). + serialize( + Header, + serialize_variable(Variable, Ver, StrictMode), + serialize_payload(Payload), + StrictMode + ). serialize( #mqtt_packet_header{ @@ -764,7 +773,8 @@ serialize( retain = Retain }, VariableBin, - PayloadBin + PayloadBin, + _StrictMode ) when ?CONNECT =< Type andalso Type =< ?AUTH -> @@ -796,7 +806,8 @@ serialize_variable( username = Username, password = Password }, - _Ver + _Ver, + StrictMode ) -> [ serialize_binary_data(ProtoName), @@ -814,20 +825,20 @@ serialize_variable( 0:1, KeepAlive:16/big-unsigned-integer >>, - serialize_properties(Properties, ProtoVer), - serialize_utf8_string(ClientId), + serialize_properties(Properties, ProtoVer, StrictMode), + serialize_utf8_string(ClientId, StrictMode), case WillFlag of true -> [ - serialize_properties(WillProps, ProtoVer), - serialize_utf8_string(WillTopic), + serialize_properties(WillProps, ProtoVer, StrictMode), + serialize_utf8_string(WillTopic, StrictMode), serialize_binary_data(WillPayload) ]; false -> <<>> end, - serialize_utf8_string(Username, true), - serialize_utf8_string(Password, true) + serialize_utf8_string(Username, true, StrictMode), + serialize_utf8_string(Password, true, StrictMode) ]; serialize_variable( #mqtt_packet_connack{ @@ -835,26 +846,28 @@ serialize_variable( reason_code = ReasonCode, properties = Properties }, - Ver + Ver, + StrictMode ) -> - [AckFlags, ReasonCode, serialize_properties(Properties, Ver)]; + [AckFlags, ReasonCode, serialize_properties(Properties, Ver, StrictMode)]; serialize_variable( #mqtt_packet_publish{ topic_name = TopicName, packet_id = PacketId, properties = Properties }, - Ver + Ver, + StrictMode ) -> [ - serialize_utf8_string(TopicName), + serialize_utf8_string(TopicName, StrictMode), case PacketId of undefined -> <<>>; _ -> <> end, - serialize_properties(Properties, Ver) + serialize_properties(Properties, Ver, StrictMode) ]; -serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver) when +serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver, _StrictMode) when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> <>; @@ -864,12 +877,13 @@ serialize_variable( reason_code = ReasonCode, properties = Properties }, - Ver = ?MQTT_PROTO_V5 + Ver = ?MQTT_PROTO_V5, + StrictMode ) -> [ <>, ReasonCode, - serialize_properties(Properties, Ver) + serialize_properties(Properties, Ver, StrictMode) ]; serialize_variable( #mqtt_packet_subscribe{ @@ -877,12 +891,13 @@ serialize_variable( properties = Properties, topic_filters = TopicFilters }, - Ver + Ver, + StrictMode ) -> [ <>, - serialize_properties(Properties, Ver), - serialize_topic_filters(subscribe, TopicFilters, Ver) + serialize_properties(Properties, Ver, StrictMode), + serialize_topic_filters(subscribe, TopicFilters, Ver, StrictMode) ]; serialize_variable( #mqtt_packet_suback{ @@ -890,11 +905,12 @@ serialize_variable( properties = Properties, reason_codes = ReasonCodes }, - Ver + Ver, + StrictMode ) -> [ <>, - serialize_properties(Properties, Ver), + serialize_properties(Properties, Ver, StrictMode), serialize_reason_codes(ReasonCodes) ]; serialize_variable( @@ -903,12 +919,13 @@ serialize_variable( properties = Properties, topic_filters = TopicFilters }, - Ver + Ver, + StrictMode ) -> [ <>, - serialize_properties(Properties, Ver), - serialize_topic_filters(unsubscribe, TopicFilters, Ver) + serialize_properties(Properties, Ver, StrictMode), + serialize_topic_filters(unsubscribe, TopicFilters, Ver, StrictMode) ]; serialize_variable( #mqtt_packet_unsuback{ @@ -916,14 +933,15 @@ serialize_variable( properties = Properties, reason_codes = ReasonCodes }, - Ver + Ver, + StrictMode ) -> [ <>, - serialize_properties(Properties, Ver), + serialize_properties(Properties, Ver, StrictMode), serialize_reason_codes(ReasonCodes) ]; -serialize_variable(#mqtt_packet_disconnect{}, Ver) when +serialize_variable(#mqtt_packet_disconnect{}, Ver, _StrictMode) when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> <<>>; @@ -932,110 +950,115 @@ serialize_variable( reason_code = ReasonCode, properties = Properties }, - Ver = ?MQTT_PROTO_V5 + Ver = ?MQTT_PROTO_V5, + StrictMode ) -> - [ReasonCode, serialize_properties(Properties, Ver)]; -serialize_variable(#mqtt_packet_disconnect{}, _Ver) -> + [ReasonCode, serialize_properties(Properties, Ver, StrictMode)]; +serialize_variable(#mqtt_packet_disconnect{}, _Ver, _StrictMode) -> <<>>; serialize_variable( #mqtt_packet_auth{ reason_code = ReasonCode, properties = Properties }, - Ver = ?MQTT_PROTO_V5 + Ver = ?MQTT_PROTO_V5, + StrictMode ) -> - [ReasonCode, serialize_properties(Properties, Ver)]; -serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) -> + [ReasonCode, serialize_properties(Properties, Ver, StrictMode)]; +serialize_variable(PacketId, ?MQTT_PROTO_V3, _StrictMode) when is_integer(PacketId) -> <>; -serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) -> +serialize_variable(PacketId, ?MQTT_PROTO_V4, _StrictMode) when is_integer(PacketId) -> <>; -serialize_variable(undefined, _Ver) -> +serialize_variable(undefined, _Ver, _StrictMode) -> <<>>. serialize_payload(undefined) -> <<>>; serialize_payload(Bin) -> Bin. -serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 -> +serialize_properties(_Props, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> <<>>; -serialize_properties(Props, ?MQTT_PROTO_V5) -> - serialize_properties(Props). +serialize_properties(Props, ?MQTT_PROTO_V5, StrictMode) -> + serialize_properties(Props, StrictMode). -serialize_properties(undefined) -> +serialize_properties(undefined, _StrictMode) -> <<0>>; -serialize_properties(Props) when map_size(Props) == 0 -> +serialize_properties(Props, _StrictMode) when map_size(Props) == 0 -> <<0>>; -serialize_properties(Props) when is_map(Props) -> - Bin = <<<<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props)>>, +serialize_properties(Props, StrictMode) when is_map(Props) -> + Bin = << + <<(serialize_property(Prop, Val, StrictMode))/binary>> + || {Prop, Val} <- maps:to_list(Props) + >>, [serialize_variable_byte_integer(byte_size(Bin)), Bin]. -serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined -> +serialize_property(_, Disabled, _StrictMode) when Disabled =:= disabled; Disabled =:= undefined -> <<>>; -serialize_property(internal_extra, _) -> +serialize_property(internal_extra, _, _StrictMode) -> <<>>; -serialize_property('Payload-Format-Indicator', Val) -> +serialize_property('Payload-Format-Indicator', Val, _StrictMode) -> <<16#01, Val>>; -serialize_property('Message-Expiry-Interval', Val) -> +serialize_property('Message-Expiry-Interval', Val, _StrictMode) -> <<16#02, Val:32/big>>; -serialize_property('Content-Type', Val) -> - <<16#03, (serialize_utf8_string(Val))/binary>>; -serialize_property('Response-Topic', Val) -> - <<16#08, (serialize_utf8_string(Val))/binary>>; -serialize_property('Correlation-Data', Val) -> +serialize_property('Content-Type', Val, StrictMode) -> + <<16#03, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Response-Topic', Val, StrictMode) -> + <<16#08, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Correlation-Data', Val, _StrictMode) -> <<16#09, (byte_size(Val)):16, Val/binary>>; -serialize_property('Subscription-Identifier', Val) -> +serialize_property('Subscription-Identifier', Val, _StrictMode) -> <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; -serialize_property('Session-Expiry-Interval', Val) -> +serialize_property('Session-Expiry-Interval', Val, _StrictMode) -> <<16#11, Val:32/big>>; -serialize_property('Assigned-Client-Identifier', Val) -> - <<16#12, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Keep-Alive', Val) -> +serialize_property('Assigned-Client-Identifier', Val, StrictMode) -> + <<16#12, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Server-Keep-Alive', Val, _StrictMode) -> <<16#13, Val:16/big>>; -serialize_property('Authentication-Method', Val) -> - <<16#15, (serialize_utf8_string(Val))/binary>>; -serialize_property('Authentication-Data', Val) -> +serialize_property('Authentication-Method', Val, StrictMode) -> + <<16#15, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Authentication-Data', Val, _StrictMode) -> <<16#16, (iolist_size(Val)):16, Val/binary>>; -serialize_property('Request-Problem-Information', Val) -> +serialize_property('Request-Problem-Information', Val, _StrictMode) -> <<16#17, Val>>; -serialize_property('Will-Delay-Interval', Val) -> +serialize_property('Will-Delay-Interval', Val, _StrictMode) -> <<16#18, Val:32/big>>; -serialize_property('Request-Response-Information', Val) -> +serialize_property('Request-Response-Information', Val, _StrictMode) -> <<16#19, Val>>; -serialize_property('Response-Information', Val) -> - <<16#1A, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Reference', Val) -> - <<16#1C, (serialize_utf8_string(Val))/binary>>; -serialize_property('Reason-String', Val) -> - <<16#1F, (serialize_utf8_string(Val))/binary>>; -serialize_property('Receive-Maximum', Val) -> +serialize_property('Response-Information', Val, StrictMode) -> + <<16#1A, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Server-Reference', Val, StrictMode) -> + <<16#1C, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Reason-String', Val, StrictMode) -> + <<16#1F, (serialize_utf8_string(Val, StrictMode))/binary>>; +serialize_property('Receive-Maximum', Val, _StrictMode) -> <<16#21, Val:16/big>>; -serialize_property('Topic-Alias-Maximum', Val) -> +serialize_property('Topic-Alias-Maximum', Val, _StrictMode) -> <<16#22, Val:16/big>>; -serialize_property('Topic-Alias', Val) -> +serialize_property('Topic-Alias', Val, _StrictMode) -> <<16#23, Val:16/big>>; -serialize_property('Maximum-QoS', Val) -> +serialize_property('Maximum-QoS', Val, _StrictMode) -> <<16#24, Val>>; -serialize_property('Retain-Available', Val) -> +serialize_property('Retain-Available', Val, _StrictMode) -> <<16#25, Val>>; -serialize_property('User-Property', {Key, Val}) -> - <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>; -serialize_property('User-Property', Props) when is_list(Props) -> +serialize_property('User-Property', {Key, Val}, StrictMode) -> + <<16#26, (serialize_utf8_pair(Key, Val, StrictMode))/binary>>; +serialize_property('User-Property', Props, StrictMode) when is_list(Props) -> << - <<(serialize_property('User-Property', {Key, Val}))/binary>> + <<(serialize_property('User-Property', {Key, Val}, StrictMode))/binary>> || {Key, Val} <- Props >>; -serialize_property('Maximum-Packet-Size', Val) -> +serialize_property('Maximum-Packet-Size', Val, _StrictMode) -> <<16#27, Val:32/big>>; -serialize_property('Wildcard-Subscription-Available', Val) -> +serialize_property('Wildcard-Subscription-Available', Val, _StrictMode) -> <<16#28, Val>>; -serialize_property('Subscription-Identifier-Available', Val) -> +serialize_property('Subscription-Identifier-Available', Val, _StrictMode) -> <<16#29, Val>>; -serialize_property('Shared-Subscription-Available', Val) -> +serialize_property('Shared-Subscription-Available', Val, _StrictMode) -> <<16#2A, Val>>. -serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> +serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5, StrictMode) -> << << - (serialize_utf8_string(Topic))/binary, + (serialize_utf8_string(Topic, StrictMode))/binary, ?RESERVED:2, Rh:2, (flag(Rap)):1, @@ -1044,37 +1067,42 @@ serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> >> || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters >>; -serialize_topic_filters(subscribe, TopicFilters, _Ver) -> +serialize_topic_filters(subscribe, TopicFilters, _Ver, StrictMode) -> << - <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> + <<(serialize_utf8_string(Topic, StrictMode))/binary, ?RESERVED:6, QoS:2>> || {Topic, #{qos := QoS}} <- TopicFilters >>; -serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> - <<<<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters>>. +serialize_topic_filters(unsubscribe, TopicFilters, _Ver, StrictMode) -> + <<<<(serialize_utf8_string(Topic, StrictMode))/binary>> || Topic <- TopicFilters>>. serialize_reason_codes(undefined) -> <<>>; serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> <<<> || Code <- ReasonCodes>>. -serialize_utf8_pair({Name, Value}) -> - <<(serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary>>. +serialize_utf8_pair(Name, Value, StrictMode) -> + << + (serialize_utf8_string(Name, StrictMode))/binary, + (serialize_utf8_string(Value, StrictMode))/binary + >>. serialize_binary_data(Bin) -> [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. -serialize_utf8_string(undefined, false) -> +serialize_utf8_string(undefined, false, _StrictMode) -> ?SERIALIZE_ERR(utf8_string_undefined); -serialize_utf8_string(undefined, true) -> +serialize_utf8_string(undefined, true, _StrictMode) -> <<>>; -serialize_utf8_string(String, _AllowNull) -> - serialize_utf8_string(String). +serialize_utf8_string(String, _AllowNull, StrictMode) -> + serialize_utf8_string(String, StrictMode). -serialize_utf8_string(String) -> +serialize_utf8_string(String, true) -> StringBin = unicode:characters_to_binary(String), - Len = byte_size(StringBin), + serialize_utf8_string(StringBin, false); +serialize_utf8_string(String, false) -> + Len = byte_size(String), true = (Len =< 16#ffff), - <>. + <>. serialize_remaining_len(I) -> serialize_variable_byte_integer(I). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 038f3e98e..ea4c89321 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -302,7 +302,7 @@ websocket_init([Req, Opts]) -> max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_opts(), + Serialize = emqx_frame:initial_serialize_opts(FrameOpts), Channel = emqx_channel:init(ConnInfo, Opts), GcState = get_force_gc(Zone), StatsTimer = get_stats_enable(Zone), diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 7fffa3374..ae927d570 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -336,6 +336,17 @@ t_handle_incoming(_) -> ), ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())). +t_handle_outing_non_utf8_topic(_) -> + Topic = <<"测试"/utf16>>, + Publish = ?PUBLISH_PACKET(0, Topic, 1), + StrictOff = #{version => 5, max_size => 16#FFFF, strict_mode => false}, + StOff = st(#{serialize => StrictOff}), + OffResult = emqx_connection:handle_outgoing(Publish, StOff), + ?assertMatch(ok, OffResult), + StrictOn = #{version => 5, max_size => 16#FFFF, strict_mode => true}, + StOn = st(#{serialize => StrictOn}), + ?assertError(frame_serialize_error, emqx_connection:handle_outgoing(Publish, StOn)). + t_with_channel(_) -> State = st(), ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 59f201864..48303d278 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -540,7 +540,7 @@ t_parse_incoming_frame_error(_) -> t_handle_incomming_frame_error(_) -> FrameError = {frame_error, bad_qos}, - Serialize = emqx_frame:serialize_fun(#{version => 5, max_size => 16#FFFF}), + Serialize = emqx_frame:serialize_fun(#{version => 5, max_size => 16#FFFF, strict_mode => false}), {[{close, bad_qos}], _St} = ?ws_conn:handle_incoming(FrameError, st(#{serialize => Serialize})). % ?assertEqual(<<224,2,129,0>>, iolist_to_binary(IoData)). diff --git a/changes/ce/fix-12944.en.md b/changes/ce/fix-12944.en.md new file mode 100644 index 000000000..c72d9f2cc --- /dev/null +++ b/changes/ce/fix-12944.en.md @@ -0,0 +1 @@ +Fixed crash on non-UTF8 client IDs with strict_mode=false.