merge emqttc parser
This commit is contained in:
parent
9512b18c53
commit
e80a78b66e
|
@ -1,269 +1,52 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
%%
|
%%%
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
%% in the Software without restriction, including without limitation the rights
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
%% furnished to do so, subject to the following conditions:
|
%%% furnished to do so, subject to the following conditions:
|
||||||
%%
|
%%%
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
%% copies or substantial portions of the Software.
|
%%% copies or substantial portions of the Software.
|
||||||
%%
|
%%%
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttc received packet parser.
|
%%% emqtt packet.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqtt_packet).
|
-module(emqtt_packet).
|
||||||
|
|
||||||
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-export([initial_state/0]).
|
%% API
|
||||||
|
-export([protocol_name/1, type_name/1, connack_name/1]).
|
||||||
-export([parse/2, serialise/1]).
|
|
||||||
|
|
||||||
-export([dump/1]).
|
-export([dump/1]).
|
||||||
|
|
||||||
-define(MAX_LEN, 16#fffffff).
|
protocol_name(Ver) when Ver =:= ?MQTT_PROTO_V31; Ver =:= ?MQTT_PROTO_V311->
|
||||||
-define(HIGHBIT, 2#10000000).
|
proplists:get_value(Ver, ?PROTOCOL_NAMES).
|
||||||
-define(LOWBITS, 2#01111111).
|
|
||||||
|
|
||||||
initial_state() -> none.
|
type_name(Type) when Type > ?RESERVED andalso Type =< ?DISCONNECT ->
|
||||||
|
lists:nth(Type, ?TYPE_NAMES).
|
||||||
|
|
||||||
parse(<<>>, none) ->
|
connack_name(?CONNACK_ACCEPT) -> 'CONNACK_ACCEPT';
|
||||||
{more, fun(Bin) -> parse(Bin, none) end};
|
connack_name(?CONNACK_PROTO_VER) -> 'CONNACK_PROTO_VER';
|
||||||
parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) ->
|
connack_name(?CONNACK_INVALID_ID ) -> 'CONNACK_INVALID_ID';
|
||||||
parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType,
|
connack_name(?CONNACK_SERVER) -> 'CONNACK_SERVER';
|
||||||
dup = bool(Dup),
|
connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS';
|
||||||
qos = QoS,
|
connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
|
||||||
retain = bool(Retain) });
|
|
||||||
parse(Bin, Cont) -> Cont(Bin).
|
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header) ->
|
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header) end};
|
|
||||||
parse_remaining_len(Rest, Header) ->
|
|
||||||
parse_remaining_len(Rest, Header, 1, 0).
|
|
||||||
|
|
||||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length)
|
|
||||||
when Length > ?MAX_LEN ->
|
|
||||||
{error, invalid_mqtt_frame_len};
|
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length) ->
|
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length) end};
|
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) ->
|
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
|
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) ->
|
|
||||||
parse_frame(Rest, Header, Value + Len * Multiplier).
|
|
||||||
|
|
||||||
parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
|
||||||
qos = Qos } = Header, Length) ->
|
|
||||||
case {Type, Bin} of
|
|
||||||
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
{ProtoName, Rest1} = parse_utf(FrameBin),
|
|
||||||
<<ProtoVersion : 8, Rest2/binary>> = Rest1,
|
|
||||||
<<UsernameFlag : 1,
|
|
||||||
PasswordFlag : 1,
|
|
||||||
WillRetain : 1,
|
|
||||||
WillQos : 2,
|
|
||||||
WillFlag : 1,
|
|
||||||
CleanSession : 1,
|
|
||||||
_Reserved : 1,
|
|
||||||
KeepAlive : 16/big,
|
|
||||||
Rest3/binary>> = Rest2,
|
|
||||||
{ClientId, Rest4} = parse_utf(Rest3),
|
|
||||||
{WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
|
|
||||||
{WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
|
|
||||||
{UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
|
|
||||||
{PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
|
|
||||||
case protocol_name_approved(ProtoVersion, ProtoName) of
|
|
||||||
true ->
|
|
||||||
wrap(Header,
|
|
||||||
#mqtt_packet_connect{
|
|
||||||
proto_ver = ProtoVersion,
|
|
||||||
proto_name = ProtoName,
|
|
||||||
will_retain = bool(WillRetain),
|
|
||||||
will_qos = WillQos,
|
|
||||||
will_flag = bool(WillFlag),
|
|
||||||
clean_sess = bool(CleanSession),
|
|
||||||
keep_alive = KeepAlive,
|
|
||||||
client_id = ClientId,
|
|
||||||
will_topic = WillTopic,
|
|
||||||
will_msg = WillMsg,
|
|
||||||
username = UserName,
|
|
||||||
password = PasssWord}, Rest);
|
|
||||||
false ->
|
|
||||||
{error, protocol_header_corrupt}
|
|
||||||
end;
|
|
||||||
{?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
{TopicName, Rest1} = parse_utf(FrameBin),
|
|
||||||
{PacketId, Payload} = case Qos of
|
|
||||||
0 -> {undefined, Rest1};
|
|
||||||
_ -> <<Id:16/big, R/binary>> = Rest1,
|
|
||||||
{Id, R}
|
|
||||||
end,
|
|
||||||
wrap(Header, #mqtt_packet_publish {topic_name = TopicName,
|
|
||||||
packet_id = PacketId },
|
|
||||||
Payload, Rest);
|
|
||||||
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
<<PacketId:16/big>> = FrameBin,
|
|
||||||
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
||||||
{?PUBREC, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
<<PacketId:16/big>> = FrameBin,
|
|
||||||
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
|
||||||
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
1 = Qos,
|
|
||||||
<<PacketId:16/big>> = FrameBin,
|
|
||||||
wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest);
|
|
||||||
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
|
|
||||||
<<PacketId:16/big>> = FrameBin,
|
|
||||||
wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest);
|
|
||||||
{Subs, <<FrameBin:Length/binary, Rest/binary>>}
|
|
||||||
when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE ->
|
|
||||||
1 = Qos,
|
|
||||||
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
|
||||||
Topics = parse_topics(Subs, Rest1, []),
|
|
||||||
wrap(Header, #mqtt_packet_subscribe { packet_id = PacketId,
|
|
||||||
topic_table = Topics }, Rest);
|
|
||||||
{Minimal, Rest}
|
|
||||||
when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ ->
|
|
||||||
Length = 0,
|
|
||||||
wrap(Header, Rest);
|
|
||||||
{_, TooShortBin} ->
|
|
||||||
{more, fun(BinMore) ->
|
|
||||||
parse_frame(<<TooShortBin/binary, BinMore/binary>>,
|
|
||||||
Header, Length)
|
|
||||||
end}
|
|
||||||
end.
|
|
||||||
|
|
||||||
parse_topics(_, <<>>, Topics) ->
|
|
||||||
Topics;
|
|
||||||
parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
|
|
||||||
{Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
|
|
||||||
parse_topics(Sub, Rest, [#mqtt_topic { name = Name, qos = QoS } | Topics]);
|
|
||||||
parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
|
|
||||||
{Name, <<Rest/binary>>} = parse_utf(Bin),
|
|
||||||
parse_topics(Sub, Rest, [#mqtt_topic { name = Name } | Topics]).
|
|
||||||
|
|
||||||
wrap(Header, Variable, Payload, Rest) ->
|
|
||||||
{ok, #mqtt_packet{ header = Header, variable = Variable, payload = Payload }, Rest}.
|
|
||||||
wrap(Header, Variable, Rest) ->
|
|
||||||
{ok, #mqtt_packet { header = Header, variable = Variable }, Rest}.
|
|
||||||
wrap(Header, Rest) ->
|
|
||||||
{ok, #mqtt_packet { header = Header }, Rest}.
|
|
||||||
|
|
||||||
parse_utf(Bin, 0) ->
|
|
||||||
{undefined, Bin};
|
|
||||||
parse_utf(Bin, _) ->
|
|
||||||
parse_utf(Bin).
|
|
||||||
|
|
||||||
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
|
||||||
{Str, Rest}.
|
|
||||||
|
|
||||||
parse_msg(Bin, 0) ->
|
|
||||||
{undefined, Bin};
|
|
||||||
parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
|
|
||||||
{Msg, Rest}.
|
|
||||||
|
|
||||||
bool(0) -> false;
|
|
||||||
bool(1) -> true.
|
|
||||||
|
|
||||||
%% serialisation
|
|
||||||
|
|
||||||
serialise(#mqtt_packet{ header = Header,
|
|
||||||
variable = Variable,
|
|
||||||
payload = Payload }) ->
|
|
||||||
serialise_header(Header,
|
|
||||||
serialise_variable(Header, Variable,
|
|
||||||
serialise_payload(Payload))).
|
|
||||||
|
|
||||||
serialise_payload(undefined) -> <<>>;
|
|
||||||
serialise_payload(B) when is_binary(B) -> B.
|
|
||||||
|
|
||||||
serialise_variable(#mqtt_packet_header { type = ?CONNACK },
|
|
||||||
#mqtt_packet_connack { ack_flags = AckFlags,
|
|
||||||
return_code = ReturnCode },
|
|
||||||
<<>> = PayloadBin) ->
|
|
||||||
VariableBin = <<AckFlags:8, ReturnCode:8>>,
|
|
||||||
{VariableBin, PayloadBin};
|
|
||||||
|
|
||||||
serialise_variable(#mqtt_packet_header { type = SubAck },
|
|
||||||
#mqtt_packet_suback { packet_id = PacketId,
|
|
||||||
qos_table = Qos },
|
|
||||||
<<>> = _PayloadBin)
|
|
||||||
when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK ->
|
|
||||||
VariableBin = <<PacketId:16/big>>,
|
|
||||||
QosBin = << <<Q:8>> || Q <- Qos >>,
|
|
||||||
{VariableBin, QosBin};
|
|
||||||
|
|
||||||
serialise_variable(#mqtt_packet_header { type = ?PUBLISH,
|
|
||||||
qos = Qos },
|
|
||||||
#mqtt_packet_publish { topic_name = TopicName,
|
|
||||||
packet_id = PacketId },
|
|
||||||
PayloadBin) ->
|
|
||||||
TopicBin = serialise_utf(TopicName),
|
|
||||||
PacketIdBin = case Qos of
|
|
||||||
0 -> <<>>;
|
|
||||||
1 -> <<PacketId:16/big>>;
|
|
||||||
2 -> <<PacketId:16/big>>
|
|
||||||
end,
|
|
||||||
{<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin};
|
|
||||||
|
|
||||||
serialise_variable(#mqtt_packet_header { type = PubAck },
|
|
||||||
#mqtt_packet_puback { packet_id = PacketId },
|
|
||||||
<<>> = _Payload)
|
|
||||||
when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC;
|
|
||||||
PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP ->
|
|
||||||
{<<PacketId:16/big>>, <<>>};
|
|
||||||
|
|
||||||
serialise_variable(#mqtt_packet_header { },
|
|
||||||
undefined,
|
|
||||||
<<>> = _PayloadBin) ->
|
|
||||||
{<<>>, <<>>}.
|
|
||||||
|
|
||||||
serialise_header(#mqtt_packet_header{ type = Type,
|
|
||||||
dup = Dup,
|
|
||||||
qos = Qos,
|
|
||||||
retain = Retain },
|
|
||||||
{VariableBin, PayloadBin})
|
|
||||||
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
|
|
||||||
Len = size(VariableBin) + size(PayloadBin),
|
|
||||||
true = (Len =< ?MAX_LEN),
|
|
||||||
LenBin = serialise_len(Len),
|
|
||||||
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
|
|
||||||
LenBin/binary, VariableBin/binary, PayloadBin/binary>>.
|
|
||||||
|
|
||||||
serialise_utf(String) ->
|
|
||||||
StringBin = unicode:characters_to_binary(String),
|
|
||||||
Len = size(StringBin),
|
|
||||||
true = (Len =< 16#ffff),
|
|
||||||
<<Len:16/big, StringBin/binary>>.
|
|
||||||
|
|
||||||
serialise_len(N) when N =< ?LOWBITS ->
|
|
||||||
<<0:1, N:7>>;
|
|
||||||
serialise_len(N) ->
|
|
||||||
<<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>.
|
|
||||||
|
|
||||||
opt(undefined) -> ?RESERVED;
|
|
||||||
opt(false) -> 0;
|
|
||||||
opt(true) -> 1;
|
|
||||||
opt(X) when is_integer(X) -> X.
|
|
||||||
|
|
||||||
protocol_name_approved(Ver, Name) ->
|
|
||||||
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
|
||||||
|
|
||||||
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) when
|
|
||||||
Payload =:= undefined orelse Payload =:= <<>> ->
|
|
||||||
dump_header(Header, dump_variable(Variable));
|
|
||||||
|
|
||||||
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||||
dump_header(Header, dump_variable(Variable, Payload)).
|
dump_header(Header, dump_variable(Variable, Payload)).
|
||||||
|
@ -274,21 +57,28 @@ dump_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Reta
|
||||||
S == undefined -> <<>>;
|
S == undefined -> <<>>;
|
||||||
true -> [", ", S]
|
true -> [", ", S]
|
||||||
end,
|
end,
|
||||||
io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s~s)", [dump_type(Type), QoS, Retain, Dup, S1]).
|
io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s~s)", [type_name(Type), QoS, Retain, Dup, S1]).
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_connect {
|
dump_variable(undefined, _) ->
|
||||||
proto_ver = ProtoVer,
|
undefined;
|
||||||
proto_name = ProtoName,
|
dump_variable(Variable, undefined) ->
|
||||||
will_retain = WillRetain,
|
dump_variable(Variable);
|
||||||
will_qos = WillQoS,
|
dump_variable(Variable, Payload) ->
|
||||||
will_flag = WillFlag,
|
io_lib:format("~s, Payload=~p", [dump_variable(Variable), Payload]).
|
||||||
clean_sess = CleanSess,
|
|
||||||
keep_alive = KeepAlive,
|
dump_variable(#mqtt_packet_connect{
|
||||||
client_id = ClientId,
|
proto_ver = ProtoVer,
|
||||||
will_topic = WillTopic,
|
proto_name = ProtoName,
|
||||||
will_msg = WillMsg,
|
will_retain = WillRetain,
|
||||||
username = Username,
|
will_qos = WillQoS,
|
||||||
password = Password} ) ->
|
will_flag = WillFlag,
|
||||||
|
clean_sess = CleanSess,
|
||||||
|
keep_alive = KeepAlive,
|
||||||
|
client_id = ClientId,
|
||||||
|
will_topic = WillTopic,
|
||||||
|
will_msg = WillMsg,
|
||||||
|
username = Username,
|
||||||
|
password = Password} ) ->
|
||||||
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
|
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
|
||||||
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, dump_password(Password)],
|
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, dump_password(Password)],
|
||||||
{Format1, Args1} = if
|
{Format1, Args1} = if
|
||||||
|
@ -298,58 +88,44 @@ dump_variable( #mqtt_packet_connect {
|
||||||
end,
|
end,
|
||||||
io_lib:format(Format1, Args1);
|
io_lib:format(Format1, Args1);
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_connack {
|
dump_variable(#mqtt_packet_connack{
|
||||||
ack_flags = AckFlags,
|
ack_flags = AckFlags,
|
||||||
return_code = ReturnCode } ) ->
|
return_code = ReturnCode } ) ->
|
||||||
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]);
|
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]);
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_publish {
|
dump_variable(#mqtt_packet_publish{
|
||||||
topic_name = TopicName,
|
topic_name = TopicName,
|
||||||
packet_id = PacketId} ) ->
|
packet_id = PacketId} ) ->
|
||||||
io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]);
|
io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]);
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_puback {
|
dump_variable(#mqtt_packet_puback{
|
||||||
packet_id = PacketId } ) ->
|
packet_id = PacketId } ) ->
|
||||||
io_lib:format("PacketId=~p", [PacketId]);
|
io_lib:format("PacketId=~p", [PacketId]);
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_subscribe {
|
dump_variable(#mqtt_packet_subscribe{
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
topic_table = TopicTable }) ->
|
topic_table = TopicTable }) ->
|
||||||
L = [{Name, QoS} || #mqtt_topic{name = Name, qos = QoS} <- TopicTable],
|
io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, TopicTable]);
|
||||||
io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, L]);
|
|
||||||
|
|
||||||
dump_variable( #mqtt_packet_suback {
|
dump_variable(#mqtt_packet_unsubscribe{
|
||||||
|
packet_id = PacketId,
|
||||||
|
topics = Topics }) ->
|
||||||
|
io_lib:format("PacketId=~p, Topics=~p", [PacketId, Topics]);
|
||||||
|
|
||||||
|
dump_variable(#mqtt_packet_suback{
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
qos_table = QosTable} ) ->
|
qos_table = QosTable} ) ->
|
||||||
io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]);
|
io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]);
|
||||||
|
|
||||||
|
dump_variable(#mqtt_packet_unsuback{
|
||||||
|
packet_id = PacketId } ) ->
|
||||||
|
io_lib:format("PacketId=~p", [PacketId]);
|
||||||
|
|
||||||
dump_variable(PacketId) when is_integer(PacketId) ->
|
dump_variable(PacketId) when is_integer(PacketId) ->
|
||||||
io_lib:format("PacketId=~p", [PacketId]);
|
io_lib:format("PacketId=~p", [PacketId]);
|
||||||
|
|
||||||
dump_variable(undefined) -> undefined.
|
dump_variable(undefined) -> undefined.
|
||||||
|
|
||||||
dump_variable(undefined, undefined) ->
|
|
||||||
undefined;
|
|
||||||
dump_variable(undefined, <<>>) ->
|
|
||||||
undefined;
|
|
||||||
dump_variable(Variable, Payload) ->
|
|
||||||
io_lib:format("~s, Payload=~p", [dump_variable(Variable), Payload]).
|
|
||||||
|
|
||||||
dump_password(undefined) -> undefined;
|
dump_password(undefined) -> undefined;
|
||||||
dump_password(_) -> <<"******">>.
|
dump_password(_) -> <<"******">>.
|
||||||
|
|
||||||
dump_type(?CONNECT) -> "CONNECT";
|
|
||||||
dump_type(?CONNACK) -> "CONNACK";
|
|
||||||
dump_type(?PUBLISH) -> "PUBLISH";
|
|
||||||
dump_type(?PUBACK) -> "PUBACK";
|
|
||||||
dump_type(?PUBREC) -> "PUBREC";
|
|
||||||
dump_type(?PUBREL) -> "PUBREL";
|
|
||||||
dump_type(?PUBCOMP) -> "PUBCOMP";
|
|
||||||
dump_type(?SUBSCRIBE) -> "SUBSCRIBE";
|
|
||||||
dump_type(?SUBACK) -> "SUBACK";
|
|
||||||
dump_type(?UNSUBSCRIBE) -> "UNSUBSCRIBE";
|
|
||||||
dump_type(?UNSUBACK) -> "UNSUBACK";
|
|
||||||
dump_type(?PINGREQ) -> "PINGREQ";
|
|
||||||
dump_type(?PINGRESP) -> "PINGRESP";
|
|
||||||
dump_type(?DISCONNECT) -> "DISCONNECT".
|
|
||||||
|
|
||||||
|
|
|
@ -20,36 +20,50 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttc received packet parser.
|
%%% emqtt received packet parser.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttc_parser).
|
-module(emqtt_parser).
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqttc_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([new/0, parse/2]).
|
-export([new/0, parse/2]).
|
||||||
|
|
||||||
%% TODO: REFACTOR...
|
|
||||||
|
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Initialize a parser.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
-spec new() -> none.
|
||||||
new() -> none.
|
new() -> none.
|
||||||
|
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Parse MQTT Packet.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
-spec parse(binary(), none | fun()) -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}.
|
||||||
parse(<<>>, none) ->
|
parse(<<>>, none) ->
|
||||||
{more, fun(Bin) -> parse(Bin, none) end};
|
{more, fun(Bin) -> parse(Bin, none) end};
|
||||||
parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) ->
|
parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) ->
|
||||||
parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType,
|
parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType,
|
||||||
dup = bool(Dup),
|
dup = bool(Dup),
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
retain = bool(Retain) });
|
retain = bool(Retain)});
|
||||||
parse(Bin, Cont) -> Cont(Bin).
|
parse(Bin, Cont) -> Cont(Bin).
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header) ->
|
parse_remaining_len(<<>>, Header) ->
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header) end};
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header) end};
|
||||||
parse_remaining_len(Rest, Header) ->
|
parse_remaining_len(Rest, Header) ->
|
||||||
parse_remaining_len(Rest, Header, 1, 0).
|
parse_remaining_len(Rest, Header, 1, 0).
|
||||||
|
|
||||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length)
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length)
|
||||||
when Length > ?MAX_LEN ->
|
when Length > ?MAX_LEN ->
|
||||||
{error, invalid_mqtt_frame_len};
|
{error, invalid_mqtt_frame_len};
|
||||||
|
@ -60,14 +74,49 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) ->
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) ->
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) ->
|
||||||
parse_frame(Rest, Header, Value + Len * Multiplier).
|
parse_frame(Rest, Header, Value + Len * Multiplier).
|
||||||
|
|
||||||
parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
parse_frame(Bin, #mqtt_packet_header{type = Type,
|
||||||
qos = Qos } = Header, Length) ->
|
qos = Qos} = Header, Length) ->
|
||||||
case {Type, Bin} of
|
case {Type, Bin} of
|
||||||
{?CONNACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
<<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin,
|
{ProtoName, Rest1} = parse_utf(FrameBin),
|
||||||
wrap(Header, #mqtt_packet_connack{
|
<<ProtoVersion : 8, Rest2/binary>> = Rest1,
|
||||||
ack_flags = SP,
|
<<UsernameFlag : 1,
|
||||||
return_code = ReturnCode }, Rest);
|
PasswordFlag : 1,
|
||||||
|
WillRetain : 1,
|
||||||
|
WillQos : 2,
|
||||||
|
WillFlag : 1,
|
||||||
|
CleanSession : 1,
|
||||||
|
_Reserved : 1,
|
||||||
|
KeepAlive : 16/big,
|
||||||
|
Rest3/binary>> = Rest2,
|
||||||
|
{ClientId, Rest4} = parse_utf(Rest3),
|
||||||
|
{WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
|
||||||
|
{WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
|
||||||
|
{UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
|
||||||
|
{PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
|
||||||
|
case protocol_name_approved(ProtoVersion, ProtoName) of
|
||||||
|
true ->
|
||||||
|
wrap(Header,
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_ver = ProtoVersion,
|
||||||
|
proto_name = ProtoName,
|
||||||
|
will_retain = bool(WillRetain),
|
||||||
|
will_qos = WillQos,
|
||||||
|
will_flag = bool(WillFlag),
|
||||||
|
clean_sess = bool(CleanSession),
|
||||||
|
keep_alive = KeepAlive,
|
||||||
|
client_id = ClientId,
|
||||||
|
will_topic = WillTopic,
|
||||||
|
will_msg = WillMsg,
|
||||||
|
username = UserName,
|
||||||
|
password = PasssWord}, Rest);
|
||||||
|
false ->
|
||||||
|
{error, protocol_header_corrupt}
|
||||||
|
end;
|
||||||
|
%{?CONNACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
|
% <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin,
|
||||||
|
% wrap(Header, #mqtt_packet_connack{ack_flags = SP,
|
||||||
|
% return_code = ReturnCode }, Rest);
|
||||||
{?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
{TopicName, Rest1} = parse_utf(FrameBin),
|
{TopicName, Rest1} = parse_utf(FrameBin),
|
||||||
{PacketId, Payload} = case Qos of
|
{PacketId, Payload} = case Qos of
|
||||||
|
@ -75,9 +124,9 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
||||||
_ -> <<Id:16/big, R/binary>> = Rest1,
|
_ -> <<Id:16/big, R/binary>> = Rest1,
|
||||||
{Id, R}
|
{Id, R}
|
||||||
end,
|
end,
|
||||||
wrap(Header, #mqtt_packet_publish {
|
wrap(Header, #mqtt_packet_publish{topic_name = TopicName,
|
||||||
topic_name = TopicName,
|
packet_id = PacketId},
|
||||||
packet_id = PacketId }, Payload, Rest);
|
Payload, Rest);
|
||||||
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
<<PacketId:16/big>> = FrameBin,
|
<<PacketId:16/big>> = FrameBin,
|
||||||
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
||||||
|
@ -87,18 +136,36 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
||||||
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
1 = Qos,
|
1 = Qos,
|
||||||
<<PacketId:16/big>> = FrameBin,
|
<<PacketId:16/big>> = FrameBin,
|
||||||
wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest);
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
||||||
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
<<PacketId:16/big>> = FrameBin,
|
<<PacketId:16/big>> = FrameBin,
|
||||||
wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest);
|
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
|
||||||
{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
|
1 = Qos,
|
||||||
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
||||||
wrap(Header, #mqtt_packet_suback { packet_id = PacketId,
|
TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
|
||||||
qos_table = parse_qos(Rest1, []) }, Rest);
|
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
{?UNSUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
topic_table = TopicTable}, Rest);
|
||||||
<<PacketId:16/big>> = FrameBin,
|
%{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest);
|
% <<PacketId:16/big, Rest1/binary>> = FrameBin,
|
||||||
{?PINGRESP, Rest} ->
|
% wrap(Header, #mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
% qos_table = parse_qos(Rest1, []) }, Rest);
|
||||||
|
{?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
|
1 = Qos,
|
||||||
|
<<PacketId:16/big, Rest1/binary>> = FrameBin,
|
||||||
|
Topics = parse_topics(?UNSUBSCRIBE, Rest1, []),
|
||||||
|
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
topics = Topics}, Rest);
|
||||||
|
%{?UNSUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
|
% <<PacketId:16/big>> = FrameBin,
|
||||||
|
% wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest);
|
||||||
|
{?PINGREQ, Rest} ->
|
||||||
|
Length = 0,
|
||||||
|
wrap(Header, Rest);
|
||||||
|
%{?PINGRESP, Rest} ->
|
||||||
|
% Length = 0,
|
||||||
|
% wrap(Header, Rest);
|
||||||
|
{?DISCONNECT, Rest} ->
|
||||||
Length = 0,
|
Length = 0,
|
||||||
wrap(Header, Rest);
|
wrap(Header, Rest);
|
||||||
{_, TooShortBin} ->
|
{_, TooShortBin} ->
|
||||||
|
@ -108,33 +175,44 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
||||||
end}
|
end}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_qos(<<>>, Acc) ->
|
wrap(Header, Variable, Payload, Rest) ->
|
||||||
lists:reverse(Acc);
|
{ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
|
||||||
parse_qos(<<QoS:8/unsigned, Rest/binary>>, Acc) ->
|
wrap(Header, Variable, Rest) ->
|
||||||
parse_qos(Rest, [QoS | Acc]).
|
{ok, #mqtt_packet {header = Header, variable = Variable}, Rest}.
|
||||||
|
wrap(Header, Rest) ->
|
||||||
|
{ok, #mqtt_packet {header = Header}, Rest}.
|
||||||
|
|
||||||
%parse_utf(Bin, 0) ->
|
%client function
|
||||||
% {undefined, Bin};
|
%parse_qos(<<>>, Acc) ->
|
||||||
%parse_utf(Bin, _) ->
|
% lists:reverse(Acc);
|
||||||
% parse_utf(Bin).
|
%parse_qos(<<QoS:8/unsigned, Rest/binary>>, Acc) ->
|
||||||
|
% parse_qos(Rest, [QoS | Acc]).
|
||||||
|
|
||||||
|
parse_topics(_, <<>>, Topics) ->
|
||||||
|
Topics;
|
||||||
|
parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
|
||||||
|
{Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
|
||||||
|
parse_topics(Sub, Rest, [{Name, QoS}| Topics]);
|
||||||
|
parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
|
||||||
|
{Name, <<Rest/binary>>} = parse_utf(Bin),
|
||||||
|
parse_topics(Sub, Rest, [Name | Topics]).
|
||||||
|
|
||||||
|
parse_utf(Bin, 0) ->
|
||||||
|
{undefined, Bin};
|
||||||
|
parse_utf(Bin, _) ->
|
||||||
|
parse_utf(Bin).
|
||||||
|
|
||||||
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
||||||
{Str, Rest}.
|
{Str, Rest}.
|
||||||
|
|
||||||
%parse_msg(Bin, 0) ->
|
parse_msg(Bin, 0) ->
|
||||||
% {undefined, Bin};
|
{undefined, Bin};
|
||||||
%parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
|
parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
|
||||||
% {Msg, Rest}.
|
{Msg, Rest}.
|
||||||
|
|
||||||
wrap(Header, Variable, Payload, Rest) ->
|
|
||||||
{ok, #mqtt_packet{ header = Header,
|
|
||||||
variable = Variable,
|
|
||||||
payload = Payload }, Rest}.
|
|
||||||
wrap(Header, Variable, Rest) ->
|
|
||||||
{ok, #mqtt_packet { header = Header,
|
|
||||||
variable = Variable }, Rest}.
|
|
||||||
wrap(Header, Rest) ->
|
|
||||||
{ok, #mqtt_packet { header = Header }, Rest}.
|
|
||||||
|
|
||||||
bool(0) -> false;
|
bool(0) -> false;
|
||||||
bool(1) -> true.
|
bool(1) -> true.
|
||||||
|
|
||||||
|
protocol_name_approved(Ver, Name) ->
|
||||||
|
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
||||||
|
|
||||||
|
|
|
@ -20,15 +20,15 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttc packet serialiser.
|
%%% emqtt packet serialiser.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttc_serialiser).
|
-module(emqtt_serialiser).
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqttc_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([serialise/1]).
|
-export([serialise/1]).
|
||||||
|
@ -36,24 +36,24 @@
|
||||||
|
|
||||||
%%TODO: doc and spec...
|
%%TODO: doc and spec...
|
||||||
|
|
||||||
serialise(#mqtt_packet{ header = Header = #mqtt_packet_header{ type = Type },
|
serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
||||||
variable = Variable,
|
variable = Variable,
|
||||||
payload = Payload }) ->
|
payload = Payload}) ->
|
||||||
serialise_header(Header,
|
serialise_header(Header,
|
||||||
serialise_variable(Type, Variable,
|
serialise_variable(Type, Variable,
|
||||||
serialise_payload(Payload))).
|
serialise_payload(Payload))).
|
||||||
|
|
||||||
serialise_header(#mqtt_packet_header{ type = Type,
|
serialise_header(#mqtt_packet_header{type = Type,
|
||||||
dup = Dup,
|
dup = Dup,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
retain = Retain },
|
retain = Retain},
|
||||||
{VariableBin, PayloadBin})
|
{VariableBin, PayloadBin})
|
||||||
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
|
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
|
||||||
Len = size(VariableBin) + size(PayloadBin),
|
Len = size(VariableBin) + size(PayloadBin),
|
||||||
true = (Len =< ?MAX_LEN),
|
true = (Len =< ?MAX_LEN),
|
||||||
LenBin = serialise_len(Len),
|
LenBin = serialise_len(Len),
|
||||||
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
|
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
|
||||||
LenBin/binary, VariableBin/binary, PayloadBin/binary>>.
|
LenBin/binary, VariableBin/binary, PayloadBin/binary>>.
|
||||||
|
|
||||||
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
|
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
|
@ -89,14 +89,28 @@ serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
|
||||||
UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>,
|
UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>,
|
||||||
{VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
|
{VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
|
||||||
|
|
||||||
|
serialise_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
|
||||||
|
return_code = ReturnCode },
|
||||||
|
undefined) ->
|
||||||
|
{<<AckFlags:8, ReturnCode:8>>, <<>>};
|
||||||
|
|
||||||
serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
|
serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
topic_table = Topics }, undefined) ->
|
topic_table = Topics }, undefined) ->
|
||||||
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
||||||
|
|
||||||
|
serialise_variable(?SUBACK, #mqtt_packet_suback {packet_id = PacketId,
|
||||||
|
qos_table = QosTable},
|
||||||
|
undefined)
|
||||||
|
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
|
||||||
|
|
||||||
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{
|
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{
|
||||||
packet_id = PacketId, topics = Topics }, undefined) ->
|
packet_id = PacketId, topics = Topics }, undefined) ->
|
||||||
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
||||||
|
|
||||||
|
serialise_variable(?UNSUBACK, #mqtt_packet_suback {packet_id = PacketId},
|
||||||
|
undefined)
|
||||||
|
{<<PacketId:16/big>>, <<>>};
|
||||||
|
|
||||||
serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName,
|
serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName,
|
||||||
packet_id = PacketId }, PayloadBin) ->
|
packet_id = PacketId }, PayloadBin) ->
|
||||||
TopicBin = serialise_utf(TopicName),
|
TopicBin = serialise_utf(TopicName),
|
||||||
|
@ -143,3 +157,4 @@ opt(false) -> 0;
|
||||||
opt(true) -> 1;
|
opt(true) -> 1;
|
||||||
opt(X) when is_integer(X) -> X;
|
opt(X) when is_integer(X) -> X;
|
||||||
opt(B) when is_binary(B) -> 1.
|
opt(B) when is_binary(B) -> 1.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue