225 lines
9.4 KiB
Erlang
225 lines
9.4 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc MQTT Packet Parser
|
|
-module(emqttd_parser).
|
|
|
|
-author("Feng Lee <feng@emqtt.io>").
|
|
|
|
-include("emqttd.hrl").
|
|
|
|
-include("emqttd_protocol.hrl").
|
|
|
|
%% API
|
|
-export([initial_state/0, initial_state/1, parse/2]).
|
|
|
|
-type(max_packet_size() :: 1..?MAX_PACKET_SIZE).
|
|
|
|
-spec(initial_state() -> {none, max_packet_size()}).
|
|
initial_state() ->
|
|
initial_state(?MAX_PACKET_SIZE).
|
|
|
|
%% @doc Initialize a parser
|
|
-spec(initial_state(max_packet_size()) -> {none, max_packet_size()}).
|
|
initial_state(MaxSize) ->
|
|
{none, MaxSize}.
|
|
|
|
%% @doc Parse MQTT Packet
|
|
-spec(parse(binary(), {none, pos_integer()} | fun())
|
|
-> {ok, mqtt_packet()} | {error, any()} | {more, fun()}).
|
|
parse(<<>>, {none, MaxLen}) ->
|
|
{more, fun(Bin) -> parse(Bin, {none, MaxLen}) end};
|
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
|
|
parse_remaining_len(Rest, #mqtt_packet_header{type = Type,
|
|
dup = bool(Dup),
|
|
qos = fixqos(Type, QoS),
|
|
retain = bool(Retain)}, Limit);
|
|
parse(Bin, Cont) -> Cont(Bin).
|
|
|
|
parse_remaining_len(<<>>, Header, Limit) ->
|
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end};
|
|
parse_remaining_len(Rest, Header, Limit) ->
|
|
parse_remaining_len(Rest, Header, 1, 0, Limit).
|
|
|
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length, MaxLen)
|
|
when Length > MaxLen ->
|
|
{error, invalid_mqtt_frame_len};
|
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) ->
|
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end};
|
|
%% optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, _Limit) ->
|
|
parse_frame(Rest, Header, 2);
|
|
%% optimize: match PINGREQ...
|
|
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, _Limit) ->
|
|
parse_frame(Rest, Header, 0);
|
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) ->
|
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit);
|
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, MaxLen) ->
|
|
FrameLen = Value + Len * Multiplier,
|
|
if
|
|
FrameLen > MaxLen -> {error, invalid_mqtt_frame_len};
|
|
true -> parse_frame(Rest, Header, FrameLen)
|
|
end.
|
|
|
|
parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) ->
|
|
case {Type, Bin} of
|
|
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
{ProtoName, Rest1} = parse_utf(FrameBin),
|
|
%% Fix mosquitto bridge: 0x83, 0x84
|
|
<<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1,
|
|
<<UsernameFlag : 1,
|
|
PasswordFlag : 1,
|
|
WillRetain : 1,
|
|
WillQos : 2,
|
|
WillFlag : 1,
|
|
CleanSess : 1,
|
|
_Reserved : 1,
|
|
KeepAlive : 16/big,
|
|
Rest3/binary>> = Rest2,
|
|
{ClientId, Rest4} = parse_utf(Rest3),
|
|
{WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
|
|
{WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
|
|
{UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
|
|
{PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
|
|
case protocol_name_approved(ProtoVersion, ProtoName) of
|
|
true ->
|
|
wrap(Header,
|
|
#mqtt_packet_connect{
|
|
proto_ver = ProtoVersion,
|
|
proto_name = ProtoName,
|
|
will_retain = bool(WillRetain),
|
|
will_qos = WillQos,
|
|
will_flag = bool(WillFlag),
|
|
clean_sess = bool(CleanSess),
|
|
keep_alive = KeepAlive,
|
|
client_id = ClientId,
|
|
will_topic = WillTopic,
|
|
will_msg = WillMsg,
|
|
username = UserName,
|
|
password = PasssWord}, Rest);
|
|
false ->
|
|
{error, protocol_header_corrupt}
|
|
end;
|
|
%{?CONNACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
% <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin,
|
|
% wrap(Header, #mqtt_packet_connack{ack_flags = SP,
|
|
% return_code = ReturnCode }, Rest);
|
|
{?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
{TopicName, Rest1} = parse_utf(FrameBin),
|
|
{PacketId, Payload} = case Qos of
|
|
0 -> {undefined, Rest1};
|
|
_ -> <<Id:16/big, R/binary>> = Rest1,
|
|
{Id, R}
|
|
end,
|
|
wrap(Header, #mqtt_packet_publish{topic_name = TopicName,
|
|
packet_id = PacketId},
|
|
Payload, Rest);
|
|
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
<<PacketId:16/big>> = FrameBin,
|
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
{?PUBREC, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
<<PacketId:16/big>> = FrameBin,
|
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
%% 1 = Qos,
|
|
<<PacketId:16/big>> = FrameBin,
|
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
<<PacketId:16/big>> = FrameBin,
|
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
{?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
%% 1 = Qos,
|
|
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
|
TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
|
|
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
|
|
topic_table = TopicTable}, Rest);
|
|
%{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
% <<PacketId:16/big, Rest1/binary>> = FrameBin,
|
|
% wrap(Header, #mqtt_packet_suback{packet_id = PacketId,
|
|
% qos_table = parse_qos(Rest1, []) }, Rest);
|
|
{?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
%% 1 = Qos,
|
|
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
|
Topics = parse_topics(?UNSUBSCRIBE, Rest1, []),
|
|
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
|
|
topics = Topics}, Rest);
|
|
%{?UNSUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
% <<PacketId:16/big>> = FrameBin,
|
|
% wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest);
|
|
{?PINGREQ, Rest} ->
|
|
Length = 0,
|
|
wrap(Header, Rest);
|
|
%{?PINGRESP, Rest} ->
|
|
% Length = 0,
|
|
% wrap(Header, Rest);
|
|
{?DISCONNECT, Rest} ->
|
|
Length = 0,
|
|
wrap(Header, Rest);
|
|
{_, TooShortBin} ->
|
|
{more, fun(BinMore) ->
|
|
parse_frame(<<TooShortBin/binary, BinMore/binary>>,
|
|
Header, Length)
|
|
end}
|
|
end.
|
|
|
|
wrap(Header, Variable, Payload, Rest) ->
|
|
{ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
|
|
wrap(Header, Variable, Rest) ->
|
|
{ok, #mqtt_packet{header = Header, variable = Variable}, Rest}.
|
|
wrap(Header, Rest) ->
|
|
{ok, #mqtt_packet{header = Header}, Rest}.
|
|
|
|
%client function
|
|
%parse_qos(<<>>, Acc) ->
|
|
% lists:reverse(Acc);
|
|
%parse_qos(<<QoS:8/unsigned, Rest/binary>>, Acc) ->
|
|
% parse_qos(Rest, [QoS | Acc]).
|
|
|
|
parse_topics(_, <<>>, Topics) ->
|
|
lists:reverse(Topics);
|
|
parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
|
|
{Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
|
|
parse_topics(Sub, Rest, [{Name, QoS}| Topics]);
|
|
parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
|
|
{Name, <<Rest/binary>>} = parse_utf(Bin),
|
|
parse_topics(Sub, Rest, [Name | Topics]).
|
|
|
|
parse_utf(Bin, 0) ->
|
|
{undefined, Bin};
|
|
parse_utf(Bin, _) ->
|
|
parse_utf(Bin).
|
|
|
|
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
|
{Str, Rest}.
|
|
|
|
parse_msg(Bin, 0) ->
|
|
{undefined, Bin};
|
|
parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
|
|
{Msg, Rest}.
|
|
|
|
bool(0) -> false;
|
|
bool(1) -> true.
|
|
|
|
protocol_name_approved(Ver, Name) ->
|
|
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
|
|
|
%% Fix Issue#575
|
|
fixqos(?PUBREL, 0) -> 1;
|
|
fixqos(?SUBSCRIBE, 0) -> 1;
|
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
|
fixqos(_Type, QoS) -> QoS.
|
|
|