refactor(emqx_frame): log frame parse error with more context
This commit is contained in:
parent
1509907744
commit
9d0ef5de6b
|
@ -542,4 +542,7 @@
|
||||||
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
||||||
-define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
|
-define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
|
||||||
|
|
||||||
|
-define(FRAME_ERROR(Reason), {frame_error, Reason}).
|
||||||
|
-define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_ERROR(Reason))).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -644,10 +644,21 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
NState = State#state{parse_state = NParseState},
|
NState = State#state{parse_state = NParseState},
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
parse_incoming(Rest, [Packet|Packets], NState)
|
||||||
catch
|
catch
|
||||||
error:Reason:Stk ->
|
throw : ?FRAME_ERROR(Reason) ->
|
||||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
?SLOG(info, #{ reason => Reason
|
||||||
[Reason, Stk, Data]),
|
, at_state => emqx_frame:describe_state(ParseState)
|
||||||
{[{frame_error, Reason}|Packets], State}
|
, input_bytes => Data
|
||||||
|
, parsed_packets => Packets
|
||||||
|
}),
|
||||||
|
{[{frame_error, Reason} | Packets], State};
|
||||||
|
error : Reason : Stacktrace ->
|
||||||
|
?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState)
|
||||||
|
, input_bytes => Data
|
||||||
|
, parsed_packets => Packets
|
||||||
|
, exception => Reason
|
||||||
|
, stacktrace => Stacktrace
|
||||||
|
}),
|
||||||
|
{[{frame_error, Reason} | Packets], State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-compile({inline, [next_incoming_msgs/1]}).
|
-compile({inline, [next_incoming_msgs/1]}).
|
||||||
|
|
|
@ -34,6 +34,10 @@
|
||||||
, serialize/2
|
, serialize/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
-export([ describe_state/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export_type([ options/0
|
-export_type([ options/0
|
||||||
, parse_state/0
|
, parse_state/0
|
||||||
, parse_result/0
|
, parse_result/0
|
||||||
|
@ -47,7 +51,9 @@
|
||||||
version => emqx_types:proto_ver()
|
version => emqx_types:proto_ver()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(parse_state() :: {none, options()} | {cont_state(), options()}).
|
-define(NONE(Options), {none, Options}).
|
||||||
|
|
||||||
|
-type(parse_state() :: ?NONE(options()) | {cont_state(), options()}).
|
||||||
|
|
||||||
-type(parse_result() :: {more, parse_state()}
|
-type(parse_result() :: {more, parse_state()}
|
||||||
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
||||||
|
@ -61,27 +67,42 @@
|
||||||
|
|
||||||
-type(serialize_opts() :: options()).
|
-type(serialize_opts() :: options()).
|
||||||
|
|
||||||
-define(none(Options), {none, Options}).
|
|
||||||
|
|
||||||
-define(DEFAULT_OPTIONS,
|
-define(DEFAULT_OPTIONS,
|
||||||
#{strict_mode => false,
|
#{strict_mode => false,
|
||||||
max_size => ?MAX_PACKET_SIZE,
|
max_size => ?MAX_PACKET_SIZE,
|
||||||
version => ?MQTT_PROTO_V4
|
version => ?MQTT_PROTO_V4
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)).
|
||||||
|
|
||||||
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
||||||
|
|
||||||
|
%% @doc Describe state for logging.
|
||||||
|
describe_state(?NONE(_Opts)) -> <<"clean">>;
|
||||||
|
describe_state({{len, _}, _Opts}) -> <<"parsing_varint_length">>;
|
||||||
|
describe_state({{body, State}, _Opts}) ->
|
||||||
|
#{ hdr := Hdr
|
||||||
|
, len := Len
|
||||||
|
} = State,
|
||||||
|
Desc = #{ parsed_header => Hdr
|
||||||
|
, expected_bytes => Len
|
||||||
|
},
|
||||||
|
case maps:get(rest, State, undefined) of
|
||||||
|
undefined -> Desc;
|
||||||
|
Body -> Desc#{received_bytes => body_bytes(Body)}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Init Parse State
|
%% Init Parse State
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(initial_parse_state() -> {none, options()}).
|
-spec(initial_parse_state() -> ?NONE(options())).
|
||||||
initial_parse_state() ->
|
initial_parse_state() ->
|
||||||
initial_parse_state(#{}).
|
initial_parse_state(#{}).
|
||||||
|
|
||||||
-spec(initial_parse_state(options()) -> {none, options()}).
|
-spec(initial_parse_state(options()) -> ?NONE(options())).
|
||||||
initial_parse_state(Options) when is_map(Options) ->
|
initial_parse_state(Options) when is_map(Options) ->
|
||||||
?none(maps:merge(?DEFAULT_OPTIONS, Options)).
|
?NONE(maps:merge(?DEFAULT_OPTIONS, Options)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Parse MQTT Frame
|
%% Parse MQTT Frame
|
||||||
|
@ -92,10 +113,10 @@ parse(Bin) ->
|
||||||
parse(Bin, initial_parse_state()).
|
parse(Bin, initial_parse_state()).
|
||||||
|
|
||||||
-spec(parse(binary(), parse_state()) -> parse_result()).
|
-spec(parse(binary(), parse_state()) -> parse_result()).
|
||||||
parse(<<>>, {none, Options}) ->
|
parse(<<>>, ?NONE(Options)) ->
|
||||||
{more, {none, Options}};
|
{more, ?NONE(Options)};
|
||||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
{none, Options = #{strict_mode := StrictMode}}) ->
|
?NONE(Options = #{strict_mode := StrictMode})) ->
|
||||||
%% Validate header if strict mode.
|
%% Validate header if strict mode.
|
||||||
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
||||||
Header = #mqtt_packet_header{type = Type,
|
Header = #mqtt_packet_header{type = Type,
|
||||||
|
@ -123,14 +144,14 @@ parse_remaining_len(Rest, Header, Options) ->
|
||||||
|
|
||||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
|
||||||
when Length > MaxSize ->
|
when Length > MaxSize ->
|
||||||
error(frame_too_large);
|
?PARSE_ERR(frame_too_large);
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
|
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
|
||||||
%% Match DISCONNECT without payload
|
%% Match DISCONNECT without payload
|
||||||
parse_remaining_len(<<0:8, Rest/binary>>,
|
parse_remaining_len(<<0:8, Rest/binary>>,
|
||||||
Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
||||||
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
||||||
{ok, Packet, Rest, ?none(Options)};
|
{ok, Packet, Rest, ?NONE(Options)};
|
||||||
%% Match PINGREQ.
|
%% Match PINGREQ.
|
||||||
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 0, Options);
|
parse_frame(Rest, Header, 0, Options);
|
||||||
|
@ -139,14 +160,14 @@ parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 2, Options);
|
parse_frame(Rest, Header, 2, Options);
|
||||||
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
||||||
when Multiplier > 2097152 ->
|
when Multiplier > 2097152 ->
|
||||||
error(malformed_variable_byte_integer);
|
?PARSE_ERR(malformed_variable_byte_integer);
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
||||||
Options = #{max_size := MaxSize}) ->
|
Options = #{max_size := MaxSize}) ->
|
||||||
FrameLen = Value + Len * Multiplier,
|
FrameLen = Value + Len * Multiplier,
|
||||||
case FrameLen > MaxSize of
|
case FrameLen > MaxSize of
|
||||||
true -> error(frame_too_large);
|
true -> ?PARSE_ERR(frame_too_large);
|
||||||
false -> parse_frame(Rest, Header, FrameLen, Options)
|
false -> parse_frame(Rest, Header, FrameLen, Options)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -165,18 +186,18 @@ flatten_body(Body) when is_binary(Body) -> Body;
|
||||||
flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
|
flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
|
||||||
|
|
||||||
parse_frame(Body, Header, 0, Options) ->
|
parse_frame(Body, Header, 0, Options) ->
|
||||||
{ok, packet(Header), flatten_body(Body), ?none(Options)};
|
{ok, packet(Header), flatten_body(Body), ?NONE(Options)};
|
||||||
parse_frame(Body, Header, Length, Options) ->
|
parse_frame(Body, Header, Length, Options) ->
|
||||||
case body_bytes(Body) >= Length of
|
case body_bytes(Body) >= Length of
|
||||||
true ->
|
true ->
|
||||||
<<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
|
<<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
|
||||||
case parse_packet(Header, FrameBin, Options) of
|
case parse_packet(Header, FrameBin, Options) of
|
||||||
{Variable, Payload} ->
|
{Variable, Payload} ->
|
||||||
{ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
|
{ok, packet(Header, Variable, Payload), Rest, ?NONE(Options)};
|
||||||
Variable = #mqtt_packet_connect{proto_ver = Ver} ->
|
Variable = #mqtt_packet_connect{proto_ver = Ver} ->
|
||||||
{ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})};
|
{ok, packet(Header, Variable), Rest, ?NONE(Options#{version := Ver})};
|
||||||
Variable ->
|
Variable ->
|
||||||
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
{ok, packet(Header, Variable), Rest, ?NONE(Options)}
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
{more, {{body, #{hdr => Header,
|
{more, {{body, #{hdr => Header,
|
||||||
|
@ -719,7 +740,7 @@ serialize_binary_data(Bin) ->
|
||||||
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
||||||
|
|
||||||
serialize_utf8_string(undefined, false) ->
|
serialize_utf8_string(undefined, false) ->
|
||||||
error(utf8_string_undefined);
|
?PARSE_ERR(utf8_string_undefined);
|
||||||
serialize_utf8_string(undefined, true) ->
|
serialize_utf8_string(undefined, true) ->
|
||||||
<<>>;
|
<<>>;
|
||||||
serialize_utf8_string(String, _AllowNull) ->
|
serialize_utf8_string(String, _AllowNull) ->
|
||||||
|
@ -767,13 +788,13 @@ validate_header(?PINGREQ, 0, 0, 0) -> ok;
|
||||||
validate_header(?PINGRESP, 0, 0, 0) -> ok;
|
validate_header(?PINGRESP, 0, 0, 0) -> ok;
|
||||||
validate_header(?DISCONNECT, 0, 0, 0) -> ok;
|
validate_header(?DISCONNECT, 0, 0, 0) -> ok;
|
||||||
validate_header(?AUTH, 0, 0, 0) -> ok;
|
validate_header(?AUTH, 0, 0, 0) -> ok;
|
||||||
validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header).
|
validate_header(_Type, _Dup, _QoS, _Rt) -> ?PARSE_ERR(bad_frame_header).
|
||||||
|
|
||||||
-compile({inline, [validate_packet_id/1]}).
|
-compile({inline, [validate_packet_id/1]}).
|
||||||
validate_packet_id(0) -> error(bad_packet_id);
|
validate_packet_id(0) -> ?PARSE_ERR(bad_packet_id);
|
||||||
validate_packet_id(_) -> ok.
|
validate_packet_id(_) -> ok.
|
||||||
|
|
||||||
validate_subqos([3|_]) -> error(bad_subqos);
|
validate_subqos([3|_]) -> ?PARSE_ERR(bad_subqos);
|
||||||
validate_subqos([_|T]) -> validate_subqos(T);
|
validate_subqos([_|T]) -> validate_subqos(T);
|
||||||
validate_subqos([]) -> ok.
|
validate_subqos([]) -> ok.
|
||||||
|
|
||||||
|
|
|
@ -547,9 +547,19 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
|
||||||
NState = State#state{parse_state = NParseState},
|
NState = State#state{parse_state = NParseState},
|
||||||
parse_incoming(Rest, postpone({incoming, Packet}, NState))
|
parse_incoming(Rest, postpone({incoming, Packet}, NState))
|
||||||
catch
|
catch
|
||||||
error:Reason:Stk ->
|
throw : ?FRAME_ERROR(Reason) ->
|
||||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p",
|
?SLOG(info, #{ reason => Reason
|
||||||
[Reason, Stk, Data]),
|
, at_state => emqx_frame:describe_state(ParseState)
|
||||||
|
, input_bytes => Data
|
||||||
|
}),
|
||||||
|
FrameError = {frame_error, Reason},
|
||||||
|
postpone({incoming, FrameError}, State);
|
||||||
|
error : Reason : Stacktrace ->
|
||||||
|
?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState)
|
||||||
|
, input_bytes => Data
|
||||||
|
, exception => Reason
|
||||||
|
, stacktrace => Stacktrace
|
||||||
|
}),
|
||||||
FrameError = {frame_error, Reason},
|
FrameError = {frame_error, Reason},
|
||||||
postpone({incoming, FrameError}, State)
|
postpone({incoming, FrameError}, State)
|
||||||
end.
|
end.
|
||||||
|
@ -791,4 +801,4 @@ get_ws_opts(Type, Listener, Key) ->
|
||||||
emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
|
emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
|
||||||
|
|
||||||
get_active_n(Type, Listener) ->
|
get_active_n(Type, Listener) ->
|
||||||
emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]).
|
emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]).
|
||||||
|
|
|
@ -22,7 +22,9 @@
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
|
|
||||||
|
-define(ASSERT_FRAME_THROW(Reason, Expr),
|
||||||
|
?assertThrow(?FRAME_ERROR(Reason), Expr)).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[{group, parse},
|
[{group, parse},
|
||||||
|
@ -113,7 +115,7 @@ init_per_group(_Group, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_parse_cont(_) ->
|
t_parse_cont(_) ->
|
||||||
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
|
||||||
|
@ -127,15 +129,15 @@ t_parse_cont(_) ->
|
||||||
|
|
||||||
t_parse_frame_too_large(_) ->
|
t_parse_frame_too_large(_) ->
|
||||||
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
|
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
|
||||||
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 256})),
|
?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 256})),
|
||||||
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
t_parse_frame_malformed_variable_byte_integer(_) ->
|
t_parse_frame_malformed_variable_byte_integer(_) ->
|
||||||
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
|
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 6) >>,
|
||||||
ParseState = emqx_frame:initial_parse_state(#{}),
|
ParseState = emqx_frame:initial_parse_state(#{}),
|
||||||
?catch_error(malformed_variable_byte_integer,
|
?ASSERT_FRAME_THROW(malformed_variable_byte_integer,
|
||||||
emqx_frame:parse(MalformedPayload, ParseState)).
|
emqx_frame:parse(MalformedPayload, ParseState)).
|
||||||
|
|
||||||
t_serialize_parse_v3_connect(_) ->
|
t_serialize_parse_v3_connect(_) ->
|
||||||
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
|
@ -329,7 +331,7 @@ t_serialize_parse_qos1_publish(_) ->
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))),
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
|
_ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
|
||||||
|
|
||||||
|
@ -340,7 +342,7 @@ t_serialize_parse_qos2_publish(_) ->
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))),
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
|
_ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
|
||||||
|
|
||||||
|
@ -360,7 +362,7 @@ t_serialize_parse_puback(_) ->
|
||||||
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
|
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)),
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))),
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}).
|
?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}).
|
||||||
|
|
||||||
|
@ -381,7 +383,7 @@ t_serialize_parse_pubrec(_) ->
|
||||||
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
|
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)),
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))),
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}).
|
?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}).
|
||||||
|
|
||||||
|
@ -397,11 +399,11 @@ t_serialize_parse_pubrel(_) ->
|
||||||
%% PUBREL with bad qos 0
|
%% PUBREL with bad qos 0
|
||||||
Bin0 = <<6:4,0:4,2,0,1>>,
|
Bin0 = <<6:4,0:4,2,0,1>>,
|
||||||
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}),
|
?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
|
||||||
|
|
||||||
t_serialize_parse_pubrel_v5(_) ->
|
t_serialize_parse_pubrel_v5(_) ->
|
||||||
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -415,7 +417,7 @@ t_serialize_parse_pubcomp(_) ->
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}),
|
?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
|
||||||
|
|
||||||
t_serialize_parse_pubcomp_v5(_) ->
|
t_serialize_parse_pubcomp_v5(_) ->
|
||||||
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -434,12 +436,12 @@ t_serialize_parse_subscribe(_) ->
|
||||||
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_to_packet(Bin0, #{strict_mode => false}),
|
_ = parse_to_packet(Bin0, #{strict_mode => false}),
|
||||||
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}),
|
_ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
|
||||||
?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))).
|
?ASSERT_FRAME_THROW(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))).
|
||||||
|
|
||||||
t_serialize_parse_subscribe_v5(_) ->
|
t_serialize_parse_subscribe_v5(_) ->
|
||||||
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}},
|
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}},
|
||||||
|
@ -453,7 +455,7 @@ t_serialize_parse_suback(_) ->
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}),
|
_ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
|
||||||
|
|
||||||
t_serialize_parse_suback_v5(_) ->
|
t_serialize_parse_suback_v5(_) ->
|
||||||
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
|
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
|
||||||
|
@ -471,11 +473,11 @@ t_serialize_parse_unsubscribe(_) ->
|
||||||
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
||||||
Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||||
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
%% strict_mode = false
|
%% strict_mode = false
|
||||||
_ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}),
|
_ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}),
|
||||||
%% strict_mode = true
|
%% strict_mode = true
|
||||||
?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
|
?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
|
||||||
|
|
||||||
t_serialize_parse_unsubscribe_v5(_) ->
|
t_serialize_parse_unsubscribe_v5(_) ->
|
||||||
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
|
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
|
||||||
|
|
Loading…
Reference in New Issue