%%-------------------------------------------------------------------- %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% @doc MQTT Packet Parser -module(emqttd_parser). -author("Feng Lee "). -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). %% API -export([initial_state/0, initial_state/1, parse/2]). -type(max_packet_size() :: 1..?MAX_PACKET_SIZE). -spec(initial_state() -> {none, max_packet_size()}). initial_state() -> initial_state(?MAX_PACKET_SIZE). %% @doc Initialize a parser -spec(initial_state(max_packet_size()) -> {none, max_packet_size()}). initial_state(MaxSize) -> {none, MaxSize}. %% @doc Parse MQTT Packet -spec(parse(binary(), {none, pos_integer()} | fun()) -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}). parse(<<>>, {none, MaxLen}) -> {more, fun(Bin) -> parse(Bin, {none, MaxLen}) end}; parse(<>, {none, Limit}) -> parse_remaining_len(Rest, #mqtt_packet_header{type = Type, dup = bool(Dup), qos = fixqos(Type, QoS), retain = bool(Retain)}, Limit); parse(Bin, Cont) -> Cont(Bin). parse_remaining_len(<<>>, Header, Limit) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end}; parse_remaining_len(Rest, Header, Limit) -> parse_remaining_len(Rest, Header, 1, 0, Limit). parse_remaining_len(_Bin, _Header, _Multiplier, Length, MaxLen) when Length > MaxLen -> {error, invalid_mqtt_frame_len}; parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end}; %% optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, _Limit) -> parse_frame(Rest, Header, 2); %% optimize: match PINGREQ... parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, _Limit) -> parse_frame(Rest, Header, 0); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit); parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, MaxLen) -> FrameLen = Value + Len * Multiplier, if FrameLen > MaxLen -> {error, invalid_mqtt_frame_len}; true -> parse_frame(Rest, Header, FrameLen) end. parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -> case {Type, Bin} of {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), %% Fix mosquitto bridge: 0x83, 0x84 <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1, <> = Rest2, {ClientId, Rest4} = parse_utf(Rest3), {WillTopic, Rest5} = parse_utf(Rest4, WillFlag), {WillMsg, Rest6} = parse_msg(Rest5, WillFlag), {UserName, Rest7} = parse_utf(Rest6, UsernameFlag), {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), case protocol_name_approved(ProtoVersion, ProtoName) of true -> wrap(Header, #mqtt_packet_connect{ proto_ver = ProtoVersion, proto_name = ProtoName, will_retain = bool(WillRetain), will_qos = WillQos, will_flag = bool(WillFlag), clean_sess = bool(CleanSess), keep_alive = KeepAlive, client_id = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = UserName, password = PasssWord}, Rest); false -> {error, protocol_header_corrupt} end; %{?CONNACK, <>} -> % <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin, % wrap(Header, #mqtt_packet_connack{ack_flags = SP, % return_code = ReturnCode }, Rest); {?PUBLISH, <>} -> {TopicName, Rest1} = parse_utf(FrameBin), {PacketId, Payload} = case Qos of 0 -> {undefined, Rest1}; _ -> <> = Rest1, {Id, R} end, wrap(Header, #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}, Payload, Rest); {?PUBACK, <>} -> <> = FrameBin, wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBREC, <>} -> <> = FrameBin, wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBREL, <>} -> %% 1 = Qos, <> = FrameBin, wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBCOMP, <>} -> <> = FrameBin, wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?SUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, TopicTable = parse_topics(?SUBSCRIBE, Rest1, []), wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId, topic_table = TopicTable}, Rest); %{?SUBACK, <>} -> % <> = FrameBin, % wrap(Header, #mqtt_packet_suback{packet_id = PacketId, % qos_table = parse_qos(Rest1, []) }, Rest); {?UNSUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, Topics = parse_topics(?UNSUBSCRIBE, Rest1, []), wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId, topics = Topics}, Rest); %{?UNSUBACK, <>} -> % <> = FrameBin, % wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest); {?PINGREQ, Rest} -> Length = 0, wrap(Header, Rest); %{?PINGRESP, Rest} -> % Length = 0, % wrap(Header, Rest); {?DISCONNECT, Rest} -> Length = 0, wrap(Header, Rest); {_, TooShortBin} -> {more, fun(BinMore) -> parse_frame(<>, Header, Length) end} end. wrap(Header, Variable, Payload, Rest) -> {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. wrap(Header, Variable, Rest) -> {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}. wrap(Header, Rest) -> {ok, #mqtt_packet{header = Header}, Rest}. %client function %parse_qos(<<>>, Acc) -> % lists:reverse(Acc); %parse_qos(<>, Acc) -> % parse_qos(Rest, [QoS | Acc]). parse_topics(_, <<>>, Topics) -> lists:reverse(Topics); parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin), parse_topics(Sub, Rest, [{Name, QoS}| Topics]); parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> {Name, <>} = parse_utf(Bin), parse_topics(Sub, Rest, [Name | Topics]). parse_utf(Bin, 0) -> {undefined, Bin}; parse_utf(Bin, _) -> parse_utf(Bin). parse_utf(<>) -> {Str, Rest}. parse_msg(Bin, 0) -> {undefined, Bin}; parse_msg(<>, _) -> {Msg, Rest}. bool(0) -> false; bool(1) -> true. protocol_name_approved(Ver, Name) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). %% Fix Issue#575 fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS.