This commit is contained in:
Ery Lee 2015-03-08 18:15:30 +08:00
parent b3178ffcd5
commit 00f39607f1
2 changed files with 49 additions and 44 deletions

View File

@ -33,7 +33,6 @@
%% API %% API
-export([init/0, parse/2]). -export([init/0, parse/2]).
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% @doc %% @doc
%% Initialize a parser. %% Initialize a parser.
@ -178,9 +177,9 @@ parse_frame(Bin, #mqtt_packet_header{type = Type,
wrap(Header, Variable, Payload, Rest) -> wrap(Header, Variable, Payload, Rest) ->
{ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
wrap(Header, Variable, Rest) -> wrap(Header, Variable, Rest) ->
{ok, #mqtt_packet {header = Header, variable = Variable}, Rest}. {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}.
wrap(Header, Rest) -> wrap(Header, Rest) ->
{ok, #mqtt_packet {header = Header}, Rest}. {ok, #mqtt_packet{header = Header}, Rest}.
%client function %client function
%parse_qos(<<>>, Acc) -> %parse_qos(<<>>, Acc) ->

View File

@ -33,6 +33,13 @@
%% API %% API
-export([serialise/1]). -export([serialise/1]).
%%------------------------------------------------------------------------------
%% @doc
%% Serialise MQTT Packet.
%%
%% @end
%%------------------------------------------------------------------------------
-spec serialise(mqtt_packet()) -> binary().
serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type}, serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
variable = Variable, variable = Variable,
payload = Payload}) -> payload = Payload}) ->
@ -40,76 +47,74 @@ serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
serialise_variable(Type, Variable, serialise_variable(Type, Variable,
serialise_payload(Payload))). serialise_payload(Payload))).
serialise_header(#mqtt_packet_header{type = Type, serialise_header(#mqtt_packet_header{type = Type,
dup = Dup, dup = Dup,
qos = Qos, qos = Qos,
retain = Retain}, retain = Retain},
{VariableBin, PayloadBin}) {VariableBin, PayloadBin}) when ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
Len = size(VariableBin) + size(PayloadBin), Len = size(VariableBin) + size(PayloadBin),
true = (Len =< ?MAX_LEN), true = (Len =< ?MAX_LEN),
LenBin = serialise_len(Len), LenBin = serialise_len(Len),
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1, <<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
LenBin/binary, VariableBin/binary, PayloadBin/binary>>. LenBin/binary,
VariableBin/binary,
PayloadBin/binary>>.
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
proto_ver = ProtoVer, proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
will_retain = WillRetain, will_retain = WillRetain,
will_qos = WillQos, will_qos = WillQos,
will_flag = WillFlag, will_flag = WillFlag,
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = KeepAlive, keep_alive = KeepAlive,
will_topic = WillTopic, will_topic = WillTopic,
will_msg = WillMsg, will_msg = WillMsg,
username = Username, username = Username,
password = Password }, undefined) -> password = Password}, undefined) ->
VariableBin = <<(size(ProtoName)):16/big-unsigned-integer, VariableBin = <<(size(ProtoName)):16/big-unsigned-integer,
ProtoName/binary, ProtoName/binary,
ProtoVer:8, ProtoVer:8,
(opt(Username)):1, (opt(Username)):1,
(opt(Password)):1, (opt(Password)):1,
(opt(WillRetain)):1, (opt(WillRetain)):1,
WillQos:2, WillQos:2,
(opt(WillFlag)):1, (opt(WillFlag)):1,
(opt(CleanSess)):1, (opt(CleanSess)):1,
0:1, 0:1,
KeepAlive:16/big-unsigned-integer>>, KeepAlive:16/big-unsigned-integer>>,
PayloadBin = serialise_utf(ClientId), PayloadBin = serialise_utf(ClientId),
PayloadBin1 = case WillFlag of PayloadBin1 = case WillFlag of
true -> <<PayloadBin/binary, true -> <<PayloadBin/binary,
(serialise_utf(WillTopic))/binary, (serialise_utf(WillTopic))/binary,
(size(WillMsg)):16/big-unsigned-integer, (size(WillMsg)):16/big-unsigned-integer,
WillMsg/binary>>; WillMsg/binary>>;
false -> PayloadBin false -> PayloadBin
end, end,
UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>, UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>,
{VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>}; {VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
serialise_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags, serialise_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}, return_code = ReturnCode}, undefined) ->
undefined) ->
{<<AckFlags:8, ReturnCode:8>>, <<>>}; {<<AckFlags:8, ReturnCode:8>>, <<>>};
serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = Topics }, undefined) -> topic_table = Topics }, undefined) ->
{<<PacketId:16/big>>, serialise_topics(Topics)}; {<<PacketId:16/big>>, serialise_topics(Topics)};
serialise_variable(?SUBACK, #mqtt_packet_suback {packet_id = PacketId, serialise_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}, qos_table = QosTable}, undefined) ->
undefined) ->
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>}; {<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{ serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId,
packet_id = PacketId, topics = Topics }, undefined) -> topics = Topics }, undefined) ->
{<<PacketId:16/big>>, serialise_topics(Topics)}; {<<PacketId:16/big>>, serialise_topics(Topics)};
serialise_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, serialise_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) ->
undefined) ->
{<<PacketId:16/big>>, <<>>}; {<<PacketId:16/big>>, <<>>};
serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName, serialise_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId }, PayloadBin) -> packet_id = PacketId }, PayloadBin) ->
TopicBin = serialise_utf(TopicName), TopicBin = serialise_utf(TopicName),
PacketIdBin = if PacketIdBin = if
PacketId =:= undefined -> <<>>; PacketId =:= undefined -> <<>>;
@ -117,7 +122,7 @@ serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName,
end, end,
{<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin}; {<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin};
serialise_variable(PubAck, #mqtt_packet_puback { packet_id = PacketId }, _Payload) serialise_variable(PubAck, #mqtt_packet_puback{packet_id = PacketId}, _Payload)
when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP ->
{<<PacketId:16/big>>, <<>>}; {<<PacketId:16/big>>, <<>>};
@ -157,3 +162,4 @@ opt(false) -> 0;
opt(true) -> 1; opt(true) -> 1;
opt(X) when is_integer(X) -> X; opt(X) when is_integer(X) -> X;
opt(B) when is_binary(B) -> 1. opt(B) when is_binary(B) -> 1.