serializer

This commit is contained in:
Feng Lee 2015-12-09 15:41:03 +08:00
parent 8f22b0ffbc
commit 05bf24b33b
2 changed files with 38 additions and 38 deletions

View File

@ -260,7 +260,7 @@ send(Packet, State = #proto_state{sendfun = SendFun})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
Data = emqttd_serialiser:serialise(Packet), Data = emqttd_serialiser:serialize(Packet),
?LOG(debug, "SEND ~p", [Data], State), ?LOG(debug, "SEND ~p", [Data], State),
emqttd_metrics:inc('bytes/sent', size(Data)), emqttd_metrics:inc('bytes/sent', size(Data)),
SendFun(Data), SendFun(Data),

View File

@ -19,45 +19,45 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc MQTT Packet Serialiser %%% @doc MQTT Packet Serializer
%%% %%%
%%% @author Feng Lee <feng@emqtt.io> %%% @author Feng Lee <feng@emqtt.io>
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_serialiser). -module(emqttd_serializer).
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API %% API
-export([serialise/1]). -export([serialize/1]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Serialise MQTT Packet %% @doc Serialise MQTT Packet
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec serialise(mqtt_packet()) -> binary(). -spec serialize(mqtt_packet()) -> binary().
serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type}, serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
variable = Variable, variable = Variable,
payload = Payload}) -> payload = Payload}) ->
serialise_header(Header, serialize_header(Header,
serialise_variable(Type, Variable, serialize_variable(Type, Variable,
serialise_payload(Payload))). serialize_payload(Payload))).
serialise_header(#mqtt_packet_header{type = Type, serialize_header(#mqtt_packet_header{type = Type,
dup = Dup, dup = Dup,
qos = Qos, qos = Qos,
retain = Retain}, retain = Retain},
{VariableBin, PayloadBin}) when ?CONNECT =< Type andalso Type =< ?DISCONNECT -> {VariableBin, PayloadBin}) when ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
Len = size(VariableBin) + size(PayloadBin), Len = size(VariableBin) + size(PayloadBin),
true = (Len =< ?MAX_LEN), true = (Len =< ?MAX_LEN),
LenBin = serialise_len(Len), LenBin = serialize_len(Len),
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1, <<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
LenBin/binary, LenBin/binary,
VariableBin/binary, VariableBin/binary,
PayloadBin/binary>>. PayloadBin/binary>>.
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, serialize_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
proto_ver = ProtoVer, proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
will_retain = WillRetain, will_retain = WillRetain,
@ -80,79 +80,79 @@ serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
(opt(CleanSess)):1, (opt(CleanSess)):1,
0:1, 0:1,
KeepAlive:16/big-unsigned-integer>>, KeepAlive:16/big-unsigned-integer>>,
PayloadBin = serialise_utf(ClientId), PayloadBin = serialize_utf(ClientId),
PayloadBin1 = case WillFlag of PayloadBin1 = case WillFlag of
true -> <<PayloadBin/binary, true -> <<PayloadBin/binary,
(serialise_utf(WillTopic))/binary, (serialize_utf(WillTopic))/binary,
(size(WillMsg)):16/big-unsigned-integer, (size(WillMsg)):16/big-unsigned-integer,
WillMsg/binary>>; WillMsg/binary>>;
false -> PayloadBin false -> PayloadBin
end, end,
UserPasswd = << <<(serialise_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>, UserPasswd = << <<(serialize_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>,
{VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>}; {VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
serialise_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags, serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}, undefined) -> return_code = ReturnCode}, undefined) ->
{<<AckFlags:8, ReturnCode:8>>, <<>>}; {<<AckFlags:8, ReturnCode:8>>, <<>>};
serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = Topics }, undefined) -> topic_table = Topics }, undefined) ->
{<<PacketId:16/big>>, serialise_topics(Topics)}; {<<PacketId:16/big>>, serialize_topics(Topics)};
serialise_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId, serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}, undefined) -> qos_table = QosTable}, undefined) ->
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>}; {<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId, serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId,
topics = Topics }, undefined) -> topics = Topics }, undefined) ->
{<<PacketId:16/big>>, serialise_topics(Topics)}; {<<PacketId:16/big>>, serialize_topics(Topics)};
serialise_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) -> serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) ->
{<<PacketId:16/big>>, <<>>}; {<<PacketId:16/big>>, <<>>};
serialise_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName, serialize_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId }, PayloadBin) -> packet_id = PacketId }, PayloadBin) ->
TopicBin = serialise_utf(TopicName), TopicBin = serialize_utf(TopicName),
PacketIdBin = if PacketIdBin = if
PacketId =:= undefined -> <<>>; PacketId =:= undefined -> <<>>;
true -> <<PacketId:16/big>> true -> <<PacketId:16/big>>
end, end,
{<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin}; {<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin};
serialise_variable(PubAck, #mqtt_packet_puback{packet_id = PacketId}, _Payload) serialize_variable(PubAck, #mqtt_packet_puback{packet_id = PacketId}, _Payload)
when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP ->
{<<PacketId:16/big>>, <<>>}; {<<PacketId:16/big>>, <<>>};
serialise_variable(?PINGREQ, undefined, undefined) -> serialize_variable(?PINGREQ, undefined, undefined) ->
{<<>>, <<>>}; {<<>>, <<>>};
serialise_variable(?PINGRESP, undefined, undefined) -> serialize_variable(?PINGRESP, undefined, undefined) ->
{<<>>, <<>>}; {<<>>, <<>>};
serialise_variable(?DISCONNECT, undefined, undefined) -> serialize_variable(?DISCONNECT, undefined, undefined) ->
{<<>>, <<>>}. {<<>>, <<>>}.
serialise_payload(undefined) -> serialize_payload(undefined) ->
undefined; undefined;
serialise_payload(Bin) when is_binary(Bin) -> serialize_payload(Bin) when is_binary(Bin) ->
Bin. Bin.
serialise_topics([{_Topic, _Qos}|_] = Topics) -> serialize_topics([{_Topic, _Qos}|_] = Topics) ->
<< <<(serialise_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>; << <<(serialize_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>;
serialise_topics([H|_] = Topics) when is_binary(H) -> serialize_topics([H|_] = Topics) when is_binary(H) ->
<< <<(serialise_utf(Topic))/binary>> || Topic <- Topics >>. << <<(serialize_utf(Topic))/binary>> || Topic <- Topics >>.
serialise_utf(String) -> serialize_utf(String) ->
StringBin = unicode:characters_to_binary(String), StringBin = unicode:characters_to_binary(String),
Len = size(StringBin), Len = size(StringBin),
true = (Len =< 16#ffff), true = (Len =< 16#ffff),
<<Len:16/big, StringBin/binary>>. <<Len:16/big, StringBin/binary>>.
serialise_len(N) when N =< ?LOWBITS -> serialize_len(N) when N =< ?LOWBITS ->
<<0:1, N:7>>; <<0:1, N:7>>;
serialise_len(N) -> serialize_len(N) ->
<<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>. <<1:1, (N rem ?HIGHBIT):7, (serialize_len(N div ?HIGHBIT))/binary>>.
opt(undefined) -> ?RESERVED; opt(undefined) -> ?RESERVED;
opt(false) -> 0; opt(false) -> 0;