Merge pull request #13357 from JimMoen/fix-utf8-frame-error-connack

Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.

- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.
This commit is contained in:
JimMoen 2024-08-02 15:24:30 +08:00 committed by GitHub
commit 09ec31908b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 318 additions and 144 deletions

View File

@ -683,6 +683,7 @@ end).
-define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_PARSE_ERROR, frame_parse_error).
-define(FRAME_SERIALIZE_ERROR, frame_serialize_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})). -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).

View File

@ -145,7 +145,9 @@
-type replies() :: emqx_types:packet() | reply() | [reply()]. -type replies() :: emqx_types:packet() | reply() | [reply()].
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}). -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
((ConnState == connected) orelse (ConnState == reauthenticating))
).
-define(IS_COMMON_SESSION_TIMER(N), -define(IS_COMMON_SESSION_TIMER(N),
((N == retry_delivery) orelse (N == expire_awaiting_rel)) ((N == retry_delivery) orelse (N == expire_awaiting_rel))
). ).
@ -333,7 +335,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
| {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}. | {shutdown, Reason :: term(), replies(), channel()}.
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
-> ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) -> handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
@ -563,29 +565,8 @@ handle_in(
process_disconnect(ReasonCode, Properties, NChannel); process_disconnect(ReasonCode, Properties, NChannel);
handle_in(?AUTH_PACKET(), Channel) -> handle_in(?AUTH_PACKET(), Channel) ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> handle_in({frame_error, Reason}, Channel) ->
shutdown(shutdown_count(frame_error, Reason), Channel); handle_frame_error(Reason, Channel);
handle_in(
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in(
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel};
handle_in(Packet, Channel) -> handle_in(Packet, Channel) ->
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}), ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@ -1017,6 +998,68 @@ not_nacked({deliver, _Topic, Msg}) ->
true true
end. end.
%%--------------------------------------------------------------------
%% Handle Frame Error
%%--------------------------------------------------------------------
handle_frame_error(
Reason = #{cause := frame_too_large},
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
ShutdownCount = shutdown_count(frame_error, Reason),
case proto_ver(Reason, ConnInfo) of
?MQTT_PROTO_V5 ->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
_ ->
shutdown(ShutdownCount, Channel)
end;
%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
%% otherwise DONOT send any CONNACK or DISCONNECT packet.
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
is_map(Reason) andalso
(ConnState == idle orelse ConnState == connecting)
->
ShutdownCount = shutdown_count(frame_error, Reason),
ProtoVer = proto_ver(Reason, ConnInfo),
NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
case ProtoVer of
?MQTT_PROTO_V5 ->
shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
_ ->
shutdown(ShutdownCount, NChannel)
end;
handle_frame_error(
Reason,
Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, Reason),
?CONNACK_PACKET(?RC_MALFORMED_PACKET),
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(
disconnect,
{?RC_MALFORMED_PACKET, Reason},
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packet %% Handle outgoing packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1285,7 +1328,7 @@ handle_info(
session = Session session = Session
} }
) when ) when
ConnState =:= connected orelse ConnState =:= reauthenticating ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
-> ->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
@ -2629,8 +2672,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
NAliases = maps:put(Topic, AliasId, Aliases), NAliases = maps:put(Topic, AliasId, Aliases),
TopicAliases#{outbound => NAliases}. TopicAliases#{outbound => NAliases}.
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}). -compile({inline, [reply/2, shutdown/2, shutdown/3]}).
reply(Reply, Channel) -> reply(Reply, Channel) ->
{reply, Reply, Channel}. {reply, Reply, Channel}.
@ -2666,13 +2708,13 @@ disconnect_and_shutdown(
?IS_MQTT_V5 = ?IS_MQTT_V5 =
#channel{conn_state = ConnState} #channel{conn_state = ConnState}
) when ) when
ConnState =:= connected orelse ConnState =:= reauthenticating ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
-> ->
NChannel = ensure_disconnected(Reason, Channel), NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 connected sessions %% mqtt v3/v4 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
-> ->
NChannel = ensure_disconnected(Reason, Channel), NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, NChannel); shutdown(Reason, Reply, NChannel);
@ -2715,6 +2757,13 @@ is_durable_session(#channel{session = Session}) ->
false false
end. end.
proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
ProtoVer;
proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
ProtoVer;
proto_ver(_, _) ->
?MQTT_PROTO_V4.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -782,7 +782,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data, input_bytes => Data,
parsed_packets => Packets parsed_packets => Packets
}), }),
{[{frame_error, Reason} | Packets], State}; NState = enrich_state(Reason, State),
{[{frame_error, Reason} | Packets], NState};
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?LOG(error, #{ ?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
@ -1204,6 +1205,12 @@ inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc), _ = emqx_pd:inc_counter(Key, Inc),
ok. ok.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
set_tcp_keepalive({quic, _Listener}) -> set_tcp_keepalive({quic, _Listener}) ->
ok; ok;
set_tcp_keepalive({Type, Id}) -> set_tcp_keepalive({Type, Id}) ->

View File

@ -266,28 +266,50 @@ packet(Header, Variable) ->
packet(Header, Variable, Payload) -> packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}. #mqtt_packet{header = Header, variable = Variable, payload = Payload}.
parse_connect(FrameBin, StrictMode) -> parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), {ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of %% No need to parse and check proto_ver if proto_name is invalid, check it first
<<"MQTT">> -> %% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
ok; _ = validate_proto_name(ProtoName),
<<"MQIsdp">> -> {IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
ok; NOptions = Options#{version => ProtoVer},
_ -> try
%% from spec: the server MAY send disconnect with reason code 0x84 do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
%% we chose to close socket because the client is likely not talking MQTT anyway catch
?PARSE_ERR(#{ throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
cause => invalid_proto_name, ?PARSE_ERR(
expected => <<"'MQTT' or 'MQIsdp'">>, ReasonM#{
received => ProtoName proto_ver => ProtoVer,
}) proto_name => ProtoName,
end, parse_state => ?NONE(NOptions)
parse_connect2(ProtoName, Rest, StrictMode). }
);
throw:{?FRAME_PARSE_ERROR, Reason} ->
?PARSE_ERR(
#{
cause => Reason,
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
)
end.
parse_connect2( do_parse_connect(
ProtoName, ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2, IsBridge,
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>, ProtoVer,
<<
UsernameFlagB:1,
PasswordFlagB:1,
WillRetainB:1,
WillQoS:2,
WillFlagB:1,
CleanStart:1,
Reserved:1,
KeepAlive:16/big,
Rest/binary
>>,
StrictMode StrictMode
) -> ) ->
_ = validate_connect_reserved(Reserved), _ = validate_connect_reserved(Reserved),
@ -302,14 +324,14 @@ parse_connect2(
UsernameFlag = bool(UsernameFlagB), UsernameFlag = bool(UsernameFlagB),
PasswordFlag = bool(PasswordFlagB) PasswordFlag = bool(PasswordFlagB)
), ),
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), {Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{ ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName, proto_name = ProtoName,
proto_ver = ProtoVer, proto_ver = ProtoVer,
%% For bridge mode, non-standard implementation %% For bridge mode, non-standard implementation
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8), is_bridge = IsBridge,
clean_start = bool(CleanStart), clean_start = bool(CleanStart),
will_flag = WillFlag, will_flag = WillFlag,
will_qos = WillQoS, will_qos = WillQoS,
@ -342,16 +364,16 @@ parse_connect2(
unexpected_trailing_bytes => size(Rest7) unexpected_trailing_bytes => size(Rest7)
}) })
end; end;
parse_connect2(_ProtoName, Bin, _StrictMode) -> do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
%% sent less than 32 bytes %% sent less than 24 bytes
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}). ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_packet( parse_packet(
#mqtt_packet_header{type = ?CONNECT}, #mqtt_packet_header{type = ?CONNECT},
FrameBin, FrameBin,
#{strict_mode := StrictMode} Options
) -> ) ->
parse_connect(FrameBin, StrictMode); parse_connect(FrameBin, Options);
parse_packet( parse_packet(
#mqtt_packet_header{type = ?CONNACK}, #mqtt_packet_header{type = ?CONNACK},
<<AckFlags:8, ReasonCode:8, Rest/binary>>, <<AckFlags:8, ReasonCode:8, Rest/binary>>,
@ -515,6 +537,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
parse_packet_id(_) -> parse_packet_id(_) ->
?PARSE_ERR(invalid_packet_id). ?PARSE_ERR(invalid_packet_id).
parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
{_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
parse_connect_proto_ver(Bin) ->
%% sent less than 1 bytes or empty
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
{#{}, Bin}; {#{}, Bin};
%% TODO: version mess? %% TODO: version mess?
@ -738,6 +766,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) ->
serialize_opts() -> serialize_opts() ->
?DEFAULT_OPTIONS. ?DEFAULT_OPTIONS.
serialize_opts(?NONE(Options)) ->
maps:merge(?DEFAULT_OPTIONS, Options);
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
#{version => ProtoVer, max_size => MaxSize}. #{version => ProtoVer, max_size => MaxSize}.
@ -1129,18 +1159,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
validate_subqos([_ | T]) -> validate_subqos(T); validate_subqos([_ | T]) -> validate_subqos(T);
validate_subqos([]) -> ok. validate_subqos([]) -> ok.
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
validate_proto_name(<<"MQTT">>) ->
ok;
validate_proto_name(<<"MQIsdp">>) ->
ok;
validate_proto_name(ProtoName) ->
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
}).
%% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3] %% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
-compile({inline, [validate_connect_reserved/1]}).
validate_connect_reserved(0) -> ok; validate_connect_reserved(0) -> ok;
validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag). validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
-compile({inline, [validate_connect_will/3]}).
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos); validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain); validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
validate_connect_will(_, _, _) -> ok. validate_connect_will(_, _, _) -> ok.
-compile({inline, [validate_connect_password_flag/4]}).
%% MQTT-v3.1 %% MQTT-v3.1
%% Username flag and password flag are not strongly related %% Username flag and password flag are not strongly related
%% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect %% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
@ -1155,6 +1201,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
validate_connect_password_flag(_, _, _, _) -> validate_connect_password_flag(_, _, _, _) ->
ok. ok.
-compile({inline, [bool/1]}).
bool(0) -> false; bool(0) -> false;
bool(1) -> true. bool(1) -> true.

View File

@ -62,7 +62,7 @@
streams := [{pid(), quicer:stream_handle()}], streams := [{pid(), quicer:stream_handle()}],
%% New stream opts %% New stream opts
stream_opts := map(), stream_opts := map(),
%% If conneciton is resumed from session ticket %% If connection is resumed from session ticket
is_resumed => boolean(), is_resumed => boolean(),
%% mqtt message serializer config %% mqtt message serializer config
serialize => undefined, serialize => undefined,
@ -70,8 +70,8 @@
}. }.
-type cb_ret() :: quicer_lib:cb_ret(). -type cb_ret() :: quicer_lib:cb_ret().
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked %% @doc Data streams initializations are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit conneciton. %% for the activation from control stream after it is accepted as a legit connection.
%% For security, the initial number of allowed data streams from client should be limited by %% For security, the initial number of allowed data streams from client should be limited by
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count` %% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
-spec activate_data_streams(pid(), { -spec activate_data_streams(pid(), {
@ -80,7 +80,7 @@
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback %% @doc connection owner init callback
-spec init(map()) -> {ok, cb_state()}. -spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)}); init(S#{stream_opts := maps:from_list(SOpts)});

View File

@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection %% TODO: should not close the ws connection
?LOG(error, #{msg => "unexpected_frame", frame => Frame}), ?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State). shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) -> websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State); handle_call(From, Req, State);
websocket_info({cast, rate_limit}, State) -> websocket_info({cast, rate_limit}, State) ->
@ -725,7 +726,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data input_bytes => Data
}), }),
FrameError = {frame_error, Reason}, FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State}; NState = enrich_state(Reason, State),
{[{incoming, FrameError} | Packets], NState};
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?LOG(error, #{ ?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
@ -1059,6 +1061,13 @@ check_max_connection(Type, Listener) ->
{denny, Reason} {denny, Reason}
end end
end. end.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -414,24 +414,32 @@ t_handle_in_auth(_) ->
emqx_channel:handle_in(?AUTH_PACKET(), Channel). emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) -> t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}), IdleChannelV5 = channel(#{conn_state => idle}),
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} = %% no CONNACK packet for v4
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel), ?assertMatch(
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
)
),
ConnectingChan = channel(#{conn_state => connecting}), ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
{shutdown, ?assertMatch(
#{ {shutdown,
shutdown_count := frame_too_large, #{
cause := frame_too_large, shutdown_count := frame_too_large,
limit := 100, cause := frame_too_large,
received := 101 limit := 100,
}, received := 101
ConnackPacket, },
_} = ConnackPacket, _},
emqx_channel:handle_in( emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}}, {frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
ConnectingChan ConnectingChan
), )
),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE), DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
ConnectedChan = channel(#{conn_state => connected}), ConnectedChan = channel(#{conn_state => connected}),
?assertMatch( ?assertMatch(

View File

@ -63,6 +63,7 @@ groups() ->
t_parse_malformed_properties, t_parse_malformed_properties,
t_malformed_connect_header, t_malformed_connect_header,
t_malformed_connect_data, t_malformed_connect_data,
t_malformed_connect_data_proto_ver,
t_reserved_connect_flag, t_reserved_connect_flag,
t_invalid_clientid, t_invalid_clientid,
t_undefined_password, t_undefined_password,
@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). ?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
%% TODO: parse v3 with 0 length clientid
t_serialize_parse_v3_connect(_) -> t_serialize_parse_v3_connect(_) ->
Bin = Bin =
<<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117, <<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
header = #mqtt_packet_header{type = ?CONNECT}, header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{ variable = #mqtt_packet_connect{
clientid = <<"C_00:0C:29:2B:77:52">>, clientid = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03, proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>, proto_name = <<"MQIsdp">>,
is_bridge = true, is_bridge = true,
will_retain = true, will_retain = true,
@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
). ).
t_malformed_connect_data(_) -> t_malformed_connect_data(_) ->
ProtoNameWithLen = <<0, 6, "MQIsdp">>,
ConnectFlags = <<2#00000000>>,
ClientIdwithLen = <<0, 1, "a">>,
UnexpectedRestBin = <<0, 1, 2>>,
?ASSERT_FRAME_THROW( ?ASSERT_FRAME_THROW(
#{cause := malformed_connect, unexpected_trailing_bytes := _}, #{cause := malformed_connect, unexpected_trailing_bytes := 3},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>) emqx_frame:parse(
<<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
ClientIdwithLen/binary, UnexpectedRestBin/binary>>
)
).
t_malformed_connect_data_proto_ver(_) ->
Proto3NameWithLen = <<0, 6, "MQIsdp">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
),
ProtoNameWithLen = <<0, 4, "MQTT">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
). ).
t_reserved_connect_flag(_) -> t_reserved_connect_flag(_) ->
?assertException( ?assertException(
throw, throw,
{frame_parse_error, reserved_connect_flag}, {frame_parse_error, #{
cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
}},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>) emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
). ).
@ -726,7 +750,7 @@ t_undefined_password(_) ->
}, },
variable = #mqtt_packet_connect{ variable = #mqtt_packet_connect{
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
proto_ver = 4, proto_ver = ?MQTT_PROTO_V4,
is_bridge = false, is_bridge = false,
clean_start = true, clean_start = true,
will_flag = false, will_flag = false,
@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>, 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException( ?assertException(
throw, throw,
{frame_parse_error, invalid_will_retain}, {frame_parse_error, #{
cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBin) emqx_frame:parse(ConnectBin)
), ),
ok. ok.
@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
), ),
?assertException( ?assertException(
throw, throw,
{frame_parse_error, invalid_will_qos}, {frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1)) emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
), ),
?assertException( ?assertException(
throw, throw,
{frame_parse_error, invalid_will_qos}, {frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2)) emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
), ),
?assertException( ?assertException(
throw, throw,
{frame_parse_error, invalid_will_qos}, {frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3)) emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
), ),
?assertException( ?assertException(
throw, throw,
{frame_parse_error, invalid_will_qos}, {frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3)) emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
), ),
ok. ok.

View File

@ -0,0 +1,4 @@
Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.
- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.

View File

@ -1,62 +1,77 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
[ [
{ {
elvis, elvis,
[ [
{config, {config, [
[ #{
#{dirs => ["src", "apps/**/src"], dirs => ["src", "apps/**/src"],
filter => "*.erl", filter => "*.erl",
ruleset => erl_files, ruleset => erl_files,
rules => [ rules => [
{elvis_style, macro_names, disable}, {elvis_style, param_pattern_matching, disable},
{elvis_style, function_naming_convention, disable}, {elvis_style, macro_names, disable},
{elvis_style, state_record_and_type, disable}, {elvis_style, function_naming_convention, disable},
{elvis_style, no_common_caveats_call, #{}}, {elvis_style, state_record_and_type, disable},
{elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal} {elvis_style, no_common_caveats_call, #{}},
, {ct, print} {elvis_style, no_debug_call, #{
]}}, debug_functions => [
{elvis_style, operator_spaces, #{rules => [{right, "|"}, {ct, pal},
{left, "|"}, {ct, print}
{right, "||"}, ]
{left, "||"}]}}, }},
{elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }}, {elvis_style, operator_spaces, #{
{elvis_style, god_modules, #{limit => 100}}, rules => [
{elvis_text_style, line_length, #{ limit => 120 % trust erlfmt {right, "|"},
, skip_comments => false {left, "|"},
}} {right, "||"},
] {left, "||"}
}, ]
#{dirs => ["test", "apps/**/test"], }},
filter => "*.erl", {elvis_style, dont_repeat_yourself, #{min_complexity => 20}},
rules => [ {elvis_style, god_modules, #{limit => 100}},
{elvis_text_style, line_length, #{ limit => 120 % trust erlfmt
, skip_comments => false {elvis_text_style, line_length, #{
}}, limit => 120,
{elvis_style, dont_repeat_yourself, #{ min_complexity => 100 }}, skip_comments => false
{elvis_style, nesting_level, #{ level => 6 }} }}
] ]
}, },
#{dirs => ["apps/emqx_rule_engine/src"], #{
filter => "*_rule_funcs.erl", dirs => ["test", "apps/**/test"],
rules => [ filter => "*.erl",
{elvis_style, god_modules, disable} rules => [
] {elvis_text_style, line_length, #{
}, limit => 120,
#{dirs => ["."], skip_comments => false
filter => "Makefile", }},
ruleset => makefiles {elvis_style, dont_repeat_yourself, #{min_complexity => 100}},
}, {elvis_style, nesting_level, #{level => 6}}
#{dirs => ["."], ]
filter => "rebar.config", },
ruleset => rebar_config #{
}, dirs => ["apps/emqx_rule_engine/src"],
#{dirs => ["."], filter => "*_rule_funcs.erl",
filter => "elvis.config", rules => [
ruleset => elvis_config {elvis_style, god_modules, disable}
} ]
] },
#{
dirs => ["."],
filter => "Makefile",
ruleset => makefiles
},
#{
dirs => ["."],
filter => "rebar.config",
ruleset => rebar_config
},
#{
dirs => ["."],
filter => "elvis.config",
ruleset => elvis_config
}
]}
]
} }
]
}
]. ].