diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqtt/src/emqtt_packet.erl index 13bc32d61..0854feceb 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -18,10 +18,12 @@ %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. -%%------------------------------------------------------------------------------ -%% -%% The Original Code is from RabbitMQ. -%% +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttc received packet parser. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_packet). diff --git a/apps/emqtt/src/emqtt_parser.erl b/apps/emqtt/src/emqtt_parser.erl new file mode 100644 index 000000000..cfe76dcef --- /dev/null +++ b/apps/emqtt/src/emqtt_parser.erl @@ -0,0 +1,140 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttc received packet parser. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttc_parser). + +-author("feng@emqtt.io"). + +-include("emqttc_packet.hrl"). + +%% API +-export([new/0, parse/2]). + +%% TODO: REFACTOR... + +new() -> none. + +parse(<<>>, none) -> + {more, fun(Bin) -> parse(Bin, none) end}; +parse(<>, none) -> + parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) }); +parse(Bin, Cont) -> Cont(Bin). + +parse_remaining_len(<<>>, Header) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header) end}; +parse_remaining_len(Rest, Header) -> + parse_remaining_len(Rest, Header, 1, 0). +parse_remaining_len(_Bin, _Header, _Multiplier, Length) + when Length > ?MAX_LEN -> + {error, invalid_mqtt_frame_len}; +parse_remaining_len(<<>>, Header, Multiplier, Length) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length) end}; +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> + parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> + parse_frame(Rest, Header, Value + Len * Multiplier). + +parse_frame(Bin, #mqtt_packet_header{ type = Type, + qos = Qos } = Header, Length) -> + case {Type, Bin} of + {?CONNACK, <>} -> + <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin, + wrap(Header, #mqtt_packet_connack{ + ack_flags = SP, + return_code = ReturnCode }, Rest); + {?PUBLISH, <>} -> + {TopicName, Rest1} = parse_utf(FrameBin), + {PacketId, Payload} = case Qos of + 0 -> {undefined, Rest1}; + _ -> <> = Rest1, + {Id, R} + end, + wrap(Header, #mqtt_packet_publish { + topic_name = TopicName, + packet_id = PacketId }, Payload, Rest); + {?PUBACK, <>} -> + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); + {?PUBREC, <>} -> + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); + {?PUBREL, <>} -> + 1 = Qos, + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); + {?PUBCOMP, <>} -> + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); + {?SUBACK, <>} -> + <> = FrameBin, + wrap(Header, #mqtt_packet_suback { packet_id = PacketId, + qos_table = parse_qos(Rest1, []) }, Rest); + {?UNSUBACK, <>} -> + <> = FrameBin, + wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest); + {?PINGRESP, Rest} -> + Length = 0, + wrap(Header, Rest); + {_, TooShortBin} -> + {more, fun(BinMore) -> + parse_frame(<>, + Header, Length) + end} + end. + +parse_qos(<<>>, Acc) -> + lists:reverse(Acc); +parse_qos(<>, Acc) -> + parse_qos(Rest, [QoS | Acc]). + +%parse_utf(Bin, 0) -> +% {undefined, Bin}; +%parse_utf(Bin, _) -> +% parse_utf(Bin). + +parse_utf(<>) -> + {Str, Rest}. + +%parse_msg(Bin, 0) -> +% {undefined, Bin}; +%parse_msg(<>, _) -> +% {Msg, Rest}. + +wrap(Header, Variable, Payload, Rest) -> + {ok, #mqtt_packet{ header = Header, + variable = Variable, + payload = Payload }, Rest}. +wrap(Header, Variable, Rest) -> + {ok, #mqtt_packet { header = Header, + variable = Variable }, Rest}. +wrap(Header, Rest) -> + {ok, #mqtt_packet { header = Header }, Rest}. + +bool(0) -> false; +bool(1) -> true. diff --git a/apps/emqtt/src/emqtt_serialiser.erl b/apps/emqtt/src/emqtt_serialiser.erl new file mode 100644 index 000000000..b2032c669 --- /dev/null +++ b/apps/emqtt/src/emqtt_serialiser.erl @@ -0,0 +1,145 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttc packet serialiser. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttc_serialiser). + +-author("feng@emqtt.io"). + +-include("emqttc_packet.hrl"). + +%% API +-export([serialise/1]). + + +%%TODO: doc and spec... + +serialise(#mqtt_packet{ header = Header = #mqtt_packet_header{ type = Type }, + variable = Variable, + payload = Payload }) -> + serialise_header(Header, + serialise_variable(Type, Variable, + serialise_payload(Payload))). + +serialise_header(#mqtt_packet_header{ type = Type, + dup = Dup, + qos = Qos, + retain = Retain }, + {VariableBin, PayloadBin}) + when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT -> + Len = size(VariableBin) + size(PayloadBin), + true = (Len =< ?MAX_LEN), + LenBin = serialise_len(Len), + <>. + +serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, + proto_ver = ProtoVer, + proto_name = ProtoName, + will_retain = WillRetain, + will_qos = WillQos, + will_flag = WillFlag, + clean_sess = CleanSess, + keep_alive = KeepAlive, + will_topic = WillTopic, + will_msg = WillMsg, + username = Username, + password = Password }, undefined) -> + VariableBin = <<(size(ProtoName)):16/big-unsigned-integer, + ProtoName/binary, + ProtoVer:8, + (opt(Username)):1, + (opt(Password)):1, + (opt(WillRetain)):1, + WillQos:2, + (opt(WillFlag)):1, + (opt(CleanSess)):1, + 0:1, + KeepAlive:16/big-unsigned-integer>>, + PayloadBin = serialise_utf(ClientId), + PayloadBin1 = case WillFlag of + true -> <>; + false -> PayloadBin + end, + UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>, + {VariableBin, <>}; + +serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, + topic_table = Topics }, undefined) -> + {<>, serialise_topics(Topics)}; + +serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{ + packet_id = PacketId, topics = Topics }, undefined) -> + {<>, serialise_topics(Topics)}; + +serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName, + packet_id = PacketId }, PayloadBin) -> + TopicBin = serialise_utf(TopicName), + PacketIdBin = if + PacketId =:= undefined -> <<>>; + true -> <> + end, + {<>, PayloadBin}; + +serialise_variable(PubAck, #mqtt_packet_puback { packet_id = PacketId }, _Payload) + when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> + {<>, <<>>}; + +serialise_variable(?PINGREQ, undefined, undefined) -> + {<<>>, <<>>}; + +serialise_variable(?DISCONNECT, undefined, undefined) -> + {<<>>, <<>>}. + +serialise_payload(undefined) -> + undefined; +serialise_payload(Bin) when is_binary(Bin) -> + Bin. + +serialise_topics([{_Topic, _Qos}|_] = Topics) -> + << <<(serialise_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>; + +serialise_topics([H|_] = Topics) when is_binary(H) -> + << <<(serialise_utf(Topic))/binary>> || Topic <- Topics >>. + +serialise_utf(String) -> + StringBin = unicode:characters_to_binary(String), + Len = size(StringBin), + true = (Len =< 16#ffff), + <>. + +serialise_len(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialise_len(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>. + +opt(undefined) -> ?RESERVED; +opt(false) -> 0; +opt(true) -> 1; +opt(X) when is_integer(X) -> X; +opt(B) when is_binary(B) -> 1.