From e80a78b66edbcbdaea22d8574b4fddebf3be3846 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 5 Mar 2015 01:38:47 +0800 Subject: [PATCH] merge emqttc parser --- apps/emqtt/src/emqtt_packet.erl | 376 ++++++---------------------- apps/emqtt/src/emqtt_parser.erl | 174 +++++++++---- apps/emqtt/src/emqtt_serialiser.erl | 39 ++- 3 files changed, 229 insertions(+), 360 deletions(-) diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqtt/src/emqtt_packet.erl index 0854feceb..5f6ca98c3 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -1,269 +1,52 @@ -%%------------------------------------------------------------------------------ -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttc received packet parser. +%%% emqtt packet. %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqtt_packet). +-author("feng@emqtt.io"). + -include("emqtt_packet.hrl"). --export([initial_state/0]). - --export([parse/2, serialise/1]). +%% API +-export([protocol_name/1, type_name/1, connack_name/1]). -export([dump/1]). --define(MAX_LEN, 16#fffffff). --define(HIGHBIT, 2#10000000). --define(LOWBITS, 2#01111111). +protocol_name(Ver) when Ver =:= ?MQTT_PROTO_V31; Ver =:= ?MQTT_PROTO_V311-> + proplists:get_value(Ver, ?PROTOCOL_NAMES). -initial_state() -> none. +type_name(Type) when Type > ?RESERVED andalso Type =< ?DISCONNECT -> + lists:nth(Type, ?TYPE_NAMES). -parse(<<>>, none) -> - {more, fun(Bin) -> parse(Bin, none) end}; -parse(<>, none) -> - parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) }); -parse(Bin, Cont) -> Cont(Bin). - -parse_remaining_len(<<>>, Header) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header) end}; -parse_remaining_len(Rest, Header) -> - parse_remaining_len(Rest, Header, 1, 0). - -parse_remaining_len(_Bin, _Header, _Multiplier, Length) - when Length > ?MAX_LEN -> - {error, invalid_mqtt_frame_len}; -parse_remaining_len(<<>>, Header, Multiplier, Length) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length) end}; -parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> - parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> - parse_frame(Rest, Header, Value + Len * Multiplier). - -parse_frame(Bin, #mqtt_packet_header{ type = Type, - qos = Qos } = Header, Length) -> - case {Type, Bin} of - {?CONNECT, <>} -> - {ProtoName, Rest1} = parse_utf(FrameBin), - <> = Rest1, - <> = Rest2, - {ClientId, Rest4} = parse_utf(Rest3), - {WillTopic, Rest5} = parse_utf(Rest4, WillFlag), - {WillMsg, Rest6} = parse_msg(Rest5, WillFlag), - {UserName, Rest7} = parse_utf(Rest6, UsernameFlag), - {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), - case protocol_name_approved(ProtoVersion, ProtoName) of - true -> - wrap(Header, - #mqtt_packet_connect{ - proto_ver = ProtoVersion, - proto_name = ProtoName, - will_retain = bool(WillRetain), - will_qos = WillQos, - will_flag = bool(WillFlag), - clean_sess = bool(CleanSession), - keep_alive = KeepAlive, - client_id = ClientId, - will_topic = WillTopic, - will_msg = WillMsg, - username = UserName, - password = PasssWord}, Rest); - false -> - {error, protocol_header_corrupt} - end; - {?PUBLISH, <>} -> - {TopicName, Rest1} = parse_utf(FrameBin), - {PacketId, Payload} = case Qos of - 0 -> {undefined, Rest1}; - _ -> <> = Rest1, - {Id, R} - end, - wrap(Header, #mqtt_packet_publish {topic_name = TopicName, - packet_id = PacketId }, - Payload, Rest); - {?PUBACK, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); - {?PUBREC, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); - {?PUBREL, <>} -> - 1 = Qos, - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); - {?PUBCOMP, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); - {Subs, <>} - when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE -> - 1 = Qos, - <> = FrameBin, - Topics = parse_topics(Subs, Rest1, []), - wrap(Header, #mqtt_packet_subscribe { packet_id = PacketId, - topic_table = Topics }, Rest); - {Minimal, Rest} - when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ -> - Length = 0, - wrap(Header, Rest); - {_, TooShortBin} -> - {more, fun(BinMore) -> - parse_frame(<>, - Header, Length) - end} - end. - -parse_topics(_, <<>>, Topics) -> - Topics; -parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> - {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin), - parse_topics(Sub, Rest, [#mqtt_topic { name = Name, qos = QoS } | Topics]); -parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> - {Name, <>} = parse_utf(Bin), - parse_topics(Sub, Rest, [#mqtt_topic { name = Name } | Topics]). - -wrap(Header, Variable, Payload, Rest) -> - {ok, #mqtt_packet{ header = Header, variable = Variable, payload = Payload }, Rest}. -wrap(Header, Variable, Rest) -> - {ok, #mqtt_packet { header = Header, variable = Variable }, Rest}. -wrap(Header, Rest) -> - {ok, #mqtt_packet { header = Header }, Rest}. - -parse_utf(Bin, 0) -> - {undefined, Bin}; -parse_utf(Bin, _) -> - parse_utf(Bin). - -parse_utf(<>) -> - {Str, Rest}. - -parse_msg(Bin, 0) -> - {undefined, Bin}; -parse_msg(<>, _) -> - {Msg, Rest}. - -bool(0) -> false; -bool(1) -> true. - -%% serialisation - -serialise(#mqtt_packet{ header = Header, - variable = Variable, - payload = Payload }) -> - serialise_header(Header, - serialise_variable(Header, Variable, - serialise_payload(Payload))). - -serialise_payload(undefined) -> <<>>; -serialise_payload(B) when is_binary(B) -> B. - -serialise_variable(#mqtt_packet_header { type = ?CONNACK }, - #mqtt_packet_connack { ack_flags = AckFlags, - return_code = ReturnCode }, - <<>> = PayloadBin) -> - VariableBin = <>, - {VariableBin, PayloadBin}; - -serialise_variable(#mqtt_packet_header { type = SubAck }, - #mqtt_packet_suback { packet_id = PacketId, - qos_table = Qos }, - <<>> = _PayloadBin) - when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK -> - VariableBin = <>, - QosBin = << <> || Q <- Qos >>, - {VariableBin, QosBin}; - -serialise_variable(#mqtt_packet_header { type = ?PUBLISH, - qos = Qos }, - #mqtt_packet_publish { topic_name = TopicName, - packet_id = PacketId }, - PayloadBin) -> - TopicBin = serialise_utf(TopicName), - PacketIdBin = case Qos of - 0 -> <<>>; - 1 -> <>; - 2 -> <> - end, - {<>, PayloadBin}; - -serialise_variable(#mqtt_packet_header { type = PubAck }, - #mqtt_packet_puback { packet_id = PacketId }, - <<>> = _Payload) - when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; - PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> - {<>, <<>>}; - -serialise_variable(#mqtt_packet_header { }, - undefined, - <<>> = _PayloadBin) -> - {<<>>, <<>>}. - -serialise_header(#mqtt_packet_header{ type = Type, - dup = Dup, - qos = Qos, - retain = Retain }, - {VariableBin, PayloadBin}) - when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT -> - Len = size(VariableBin) + size(PayloadBin), - true = (Len =< ?MAX_LEN), - LenBin = serialise_len(Len), - <>. - -serialise_utf(String) -> - StringBin = unicode:characters_to_binary(String), - Len = size(StringBin), - true = (Len =< 16#ffff), - <>. - -serialise_len(N) when N =< ?LOWBITS -> - <<0:1, N:7>>; -serialise_len(N) -> - <<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>. - -opt(undefined) -> ?RESERVED; -opt(false) -> 0; -opt(true) -> 1; -opt(X) when is_integer(X) -> X. - -protocol_name_approved(Ver, Name) -> - lists:member({Ver, Name}, ?PROTOCOL_NAMES). - -dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) when - Payload =:= undefined orelse Payload =:= <<>> -> - dump_header(Header, dump_variable(Variable)); +connack_name(?CONNACK_ACCEPT) -> 'CONNACK_ACCEPT'; +connack_name(?CONNACK_PROTO_VER) -> 'CONNACK_PROTO_VER'; +connack_name(?CONNACK_INVALID_ID ) -> 'CONNACK_INVALID_ID'; +connack_name(?CONNACK_SERVER) -> 'CONNACK_SERVER'; +connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS'; +connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'. dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> dump_header(Header, dump_variable(Variable, Payload)). @@ -274,21 +57,28 @@ dump_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Reta S == undefined -> <<>>; true -> [", ", S] end, - io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s~s)", [dump_type(Type), QoS, Retain, Dup, S1]). + io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s~s)", [type_name(Type), QoS, Retain, Dup, S1]). -dump_variable( #mqtt_packet_connect { - proto_ver = ProtoVer, - proto_name = ProtoName, - will_retain = WillRetain, - will_qos = WillQoS, - will_flag = WillFlag, - clean_sess = CleanSess, - keep_alive = KeepAlive, - client_id = ClientId, - will_topic = WillTopic, - will_msg = WillMsg, - username = Username, - password = Password} ) -> +dump_variable(undefined, _) -> + undefined; +dump_variable(Variable, undefined) -> + dump_variable(Variable); +dump_variable(Variable, Payload) -> + io_lib:format("~s, Payload=~p", [dump_variable(Variable), Payload]). + +dump_variable(#mqtt_packet_connect{ + proto_ver = ProtoVer, + proto_name = ProtoName, + will_retain = WillRetain, + will_qos = WillQoS, + will_flag = WillFlag, + clean_sess = CleanSess, + keep_alive = KeepAlive, + client_id = ClientId, + will_topic = WillTopic, + will_msg = WillMsg, + username = Username, + password = Password} ) -> Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s", Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, dump_password(Password)], {Format1, Args1} = if @@ -298,58 +88,44 @@ dump_variable( #mqtt_packet_connect { end, io_lib:format(Format1, Args1); -dump_variable( #mqtt_packet_connack { - ack_flags = AckFlags, - return_code = ReturnCode } ) -> +dump_variable(#mqtt_packet_connack{ + ack_flags = AckFlags, + return_code = ReturnCode } ) -> io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); -dump_variable( #mqtt_packet_publish { +dump_variable(#mqtt_packet_publish{ topic_name = TopicName, packet_id = PacketId} ) -> io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]); -dump_variable( #mqtt_packet_puback { - packet_id = PacketId } ) -> +dump_variable(#mqtt_packet_puback{ + packet_id = PacketId } ) -> io_lib:format("PacketId=~p", [PacketId]); -dump_variable( #mqtt_packet_subscribe { - packet_id = PacketId, - topic_table = TopicTable }) -> - L = [{Name, QoS} || #mqtt_topic{name = Name, qos = QoS} <- TopicTable], - io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, L]); +dump_variable(#mqtt_packet_subscribe{ + packet_id = PacketId, + topic_table = TopicTable }) -> + io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, TopicTable]); -dump_variable( #mqtt_packet_suback { +dump_variable(#mqtt_packet_unsubscribe{ + packet_id = PacketId, + topics = Topics }) -> + io_lib:format("PacketId=~p, Topics=~p", [PacketId, Topics]); + +dump_variable(#mqtt_packet_suback{ packet_id = PacketId, qos_table = QosTable} ) -> io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]); +dump_variable(#mqtt_packet_unsuback{ + packet_id = PacketId } ) -> + io_lib:format("PacketId=~p", [PacketId]); + dump_variable(PacketId) when is_integer(PacketId) -> io_lib:format("PacketId=~p", [PacketId]); dump_variable(undefined) -> undefined. -dump_variable(undefined, undefined) -> - undefined; -dump_variable(undefined, <<>>) -> - undefined; -dump_variable(Variable, Payload) -> - io_lib:format("~s, Payload=~p", [dump_variable(Variable), Payload]). - dump_password(undefined) -> undefined; dump_password(_) -> <<"******">>. -dump_type(?CONNECT) -> "CONNECT"; -dump_type(?CONNACK) -> "CONNACK"; -dump_type(?PUBLISH) -> "PUBLISH"; -dump_type(?PUBACK) -> "PUBACK"; -dump_type(?PUBREC) -> "PUBREC"; -dump_type(?PUBREL) -> "PUBREL"; -dump_type(?PUBCOMP) -> "PUBCOMP"; -dump_type(?SUBSCRIBE) -> "SUBSCRIBE"; -dump_type(?SUBACK) -> "SUBACK"; -dump_type(?UNSUBSCRIBE) -> "UNSUBSCRIBE"; -dump_type(?UNSUBACK) -> "UNSUBACK"; -dump_type(?PINGREQ) -> "PINGREQ"; -dump_type(?PINGRESP) -> "PINGRESP"; -dump_type(?DISCONNECT) -> "DISCONNECT". - diff --git a/apps/emqtt/src/emqtt_parser.erl b/apps/emqtt/src/emqtt_parser.erl index cfe76dcef..2e4909688 100644 --- a/apps/emqtt/src/emqtt_parser.erl +++ b/apps/emqtt/src/emqtt_parser.erl @@ -20,36 +20,50 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttc received packet parser. +%%% emqtt received packet parser. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttc_parser). +-module(emqtt_parser). -author("feng@emqtt.io"). --include("emqttc_packet.hrl"). +-include("emqtt_packet.hrl"). %% API -export([new/0, parse/2]). -%% TODO: REFACTOR... +%%%----------------------------------------------------------------------------- +%% @doc +%% Initialize a parser. +%% +%% @end +%%%----------------------------------------------------------------------------- +-spec new() -> none. new() -> none. +%%%----------------------------------------------------------------------------- +%% @doc +%% Parse MQTT Packet. +%% +%% @end +%%%----------------------------------------------------------------------------- +-spec parse(binary(), none | fun()) -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}. parse(<<>>, none) -> {more, fun(Bin) -> parse(Bin, none) end}; parse(<>, none) -> parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) }); + dup = bool(Dup), + qos = QoS, + retain = bool(Retain)}); parse(Bin, Cont) -> Cont(Bin). parse_remaining_len(<<>>, Header) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header) end}; parse_remaining_len(Rest, Header) -> parse_remaining_len(Rest, Header, 1, 0). + parse_remaining_len(_Bin, _Header, _Multiplier, Length) when Length > ?MAX_LEN -> {error, invalid_mqtt_frame_len}; @@ -60,14 +74,49 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> parse_frame(Rest, Header, Value + Len * Multiplier). -parse_frame(Bin, #mqtt_packet_header{ type = Type, - qos = Qos } = Header, Length) -> +parse_frame(Bin, #mqtt_packet_header{type = Type, + qos = Qos} = Header, Length) -> case {Type, Bin} of - {?CONNACK, <>} -> - <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin, - wrap(Header, #mqtt_packet_connack{ - ack_flags = SP, - return_code = ReturnCode }, Rest); + {?CONNECT, <>} -> + {ProtoName, Rest1} = parse_utf(FrameBin), + <> = Rest1, + <> = Rest2, + {ClientId, Rest4} = parse_utf(Rest3), + {WillTopic, Rest5} = parse_utf(Rest4, WillFlag), + {WillMsg, Rest6} = parse_msg(Rest5, WillFlag), + {UserName, Rest7} = parse_utf(Rest6, UsernameFlag), + {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), + case protocol_name_approved(ProtoVersion, ProtoName) of + true -> + wrap(Header, + #mqtt_packet_connect{ + proto_ver = ProtoVersion, + proto_name = ProtoName, + will_retain = bool(WillRetain), + will_qos = WillQos, + will_flag = bool(WillFlag), + clean_sess = bool(CleanSession), + keep_alive = KeepAlive, + client_id = ClientId, + will_topic = WillTopic, + will_msg = WillMsg, + username = UserName, + password = PasssWord}, Rest); + false -> + {error, protocol_header_corrupt} + end; + %{?CONNACK, <>} -> + % <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin, + % wrap(Header, #mqtt_packet_connack{ack_flags = SP, + % return_code = ReturnCode }, Rest); {?PUBLISH, <>} -> {TopicName, Rest1} = parse_utf(FrameBin), {PacketId, Payload} = case Qos of @@ -75,9 +124,9 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type, _ -> <> = Rest1, {Id, R} end, - wrap(Header, #mqtt_packet_publish { - topic_name = TopicName, - packet_id = PacketId }, Payload, Rest); + wrap(Header, #mqtt_packet_publish{topic_name = TopicName, + packet_id = PacketId}, + Payload, Rest); {?PUBACK, <>} -> <> = FrameBin, wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); @@ -87,18 +136,36 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type, {?PUBREL, <>} -> 1 = Qos, <> = FrameBin, - wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBCOMP, <>} -> <> = FrameBin, - wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); - {?SUBACK, <>} -> + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); + {?SUBSCRIBE, <>} -> + 1 = Qos, <> = FrameBin, - wrap(Header, #mqtt_packet_suback { packet_id = PacketId, - qos_table = parse_qos(Rest1, []) }, Rest); - {?UNSUBACK, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest); - {?PINGRESP, Rest} -> + TopicTable = parse_topics(?SUBSCRIBE, Rest1, []), + wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId, + topic_table = TopicTable}, Rest); + %{?SUBACK, <>} -> + % <> = FrameBin, + % wrap(Header, #mqtt_packet_suback{packet_id = PacketId, + % qos_table = parse_qos(Rest1, []) }, Rest); + {?UNSUBSCRIBE, <>} -> + 1 = Qos, + <> = FrameBin, + Topics = parse_topics(?UNSUBSCRIBE, Rest1, []), + wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId, + topics = Topics}, Rest); + %{?UNSUBACK, <>} -> + % <> = FrameBin, + % wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest); + {?PINGREQ, Rest} -> + Length = 0, + wrap(Header, Rest); + %{?PINGRESP, Rest} -> + % Length = 0, + % wrap(Header, Rest); + {?DISCONNECT, Rest} -> Length = 0, wrap(Header, Rest); {_, TooShortBin} -> @@ -108,33 +175,44 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type, end} end. -parse_qos(<<>>, Acc) -> - lists:reverse(Acc); -parse_qos(<>, Acc) -> - parse_qos(Rest, [QoS | Acc]). +wrap(Header, Variable, Payload, Rest) -> + {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. +wrap(Header, Variable, Rest) -> + {ok, #mqtt_packet {header = Header, variable = Variable}, Rest}. +wrap(Header, Rest) -> + {ok, #mqtt_packet {header = Header}, Rest}. -%parse_utf(Bin, 0) -> -% {undefined, Bin}; -%parse_utf(Bin, _) -> -% parse_utf(Bin). +%client function +%parse_qos(<<>>, Acc) -> +% lists:reverse(Acc); +%parse_qos(<>, Acc) -> +% parse_qos(Rest, [QoS | Acc]). + +parse_topics(_, <<>>, Topics) -> + Topics; +parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> + {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin), + parse_topics(Sub, Rest, [{Name, QoS}| Topics]); +parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> + {Name, <>} = parse_utf(Bin), + parse_topics(Sub, Rest, [Name | Topics]). + +parse_utf(Bin, 0) -> + {undefined, Bin}; +parse_utf(Bin, _) -> + parse_utf(Bin). parse_utf(<>) -> {Str, Rest}. -%parse_msg(Bin, 0) -> -% {undefined, Bin}; -%parse_msg(<>, _) -> -% {Msg, Rest}. - -wrap(Header, Variable, Payload, Rest) -> - {ok, #mqtt_packet{ header = Header, - variable = Variable, - payload = Payload }, Rest}. -wrap(Header, Variable, Rest) -> - {ok, #mqtt_packet { header = Header, - variable = Variable }, Rest}. -wrap(Header, Rest) -> - {ok, #mqtt_packet { header = Header }, Rest}. +parse_msg(Bin, 0) -> + {undefined, Bin}; +parse_msg(<>, _) -> + {Msg, Rest}. bool(0) -> false; bool(1) -> true. + +protocol_name_approved(Ver, Name) -> + lists:member({Ver, Name}, ?PROTOCOL_NAMES). + diff --git a/apps/emqtt/src/emqtt_serialiser.erl b/apps/emqtt/src/emqtt_serialiser.erl index b2032c669..152c5e230 100644 --- a/apps/emqtt/src/emqtt_serialiser.erl +++ b/apps/emqtt/src/emqtt_serialiser.erl @@ -20,15 +20,15 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttc packet serialiser. +%%% emqtt packet serialiser. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttc_serialiser). +-module(emqtt_serialiser). -author("feng@emqtt.io"). --include("emqttc_packet.hrl"). +-include("emqtt_packet.hrl"). %% API -export([serialise/1]). @@ -36,24 +36,24 @@ %%TODO: doc and spec... -serialise(#mqtt_packet{ header = Header = #mqtt_packet_header{ type = Type }, - variable = Variable, - payload = Payload }) -> +serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type}, + variable = Variable, + payload = Payload}) -> serialise_header(Header, serialise_variable(Type, Variable, serialise_payload(Payload))). -serialise_header(#mqtt_packet_header{ type = Type, - dup = Dup, - qos = Qos, - retain = Retain }, +serialise_header(#mqtt_packet_header{type = Type, + dup = Dup, + qos = Qos, + retain = Retain}, {VariableBin, PayloadBin}) when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT -> Len = size(VariableBin) + size(PayloadBin), true = (Len =< ?MAX_LEN), LenBin = serialise_len(Len), - <>. + <>. serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, proto_ver = ProtoVer, @@ -89,14 +89,28 @@ serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>, {VariableBin, <>}; +serialise_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags, + return_code = ReturnCode }, + undefined) -> + {<>, <<>>}; + serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, topic_table = Topics }, undefined) -> {<>, serialise_topics(Topics)}; +serialise_variable(?SUBACK, #mqtt_packet_suback {packet_id = PacketId, + qos_table = QosTable}, + undefined) + {<>, << <> || Q <- QosTable >>}; + serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{ packet_id = PacketId, topics = Topics }, undefined) -> {<>, serialise_topics(Topics)}; +serialise_variable(?UNSUBACK, #mqtt_packet_suback {packet_id = PacketId}, + undefined) + {<>, <<>>}; + serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName, packet_id = PacketId }, PayloadBin) -> TopicBin = serialise_utf(TopicName), @@ -143,3 +157,4 @@ opt(false) -> 0; opt(true) -> 1; opt(X) when is_integer(X) -> X; opt(B) when is_binary(B) -> 1. +