merge emqttc packet header
This commit is contained in:
parent
c6668c6dc9
commit
b11026788a
|
@ -99,7 +99,7 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
|
|||
|
||||
%%TODO: ok??
|
||||
handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
|
||||
{ok, ProtoState1} = emqtt_protocol:send_message({From, Message}, ProtoState),
|
||||
{ok, ProtoState1} = emqtt_protocol:send({From, Message}, ProtoState),
|
||||
{noreply, State#state{proto_state = ProtoState1}};
|
||||
|
||||
handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%% SOFTWARE.
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-module(emqtt_net).
|
||||
|
||||
-author('feng@emqtt.io').
|
||||
|
@ -237,4 +236,3 @@ hostname() ->
|
|||
{error, _Reason} -> Hostname
|
||||
end.
|
||||
|
||||
|
||||
|
|
|
@ -1,42 +1,43 @@
|
|||
%%-----------------------------------------------------------------------------
|
||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%
|
||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%% of this software and associated documentation files (the "Software"), to deal
|
||||
%% in the Software without restriction, including without limitation the rights
|
||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%% copies of the Software, and to permit persons to whom the Software is
|
||||
%% furnished to do so, subject to the following conditions:
|
||||
%%
|
||||
%% The above copyright notice and this permission notice shall be included in all
|
||||
%% copies or substantial portions of the Software.
|
||||
%%
|
||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%% SOFTWARE.
|
||||
%%------------------------------------------------------------------------------
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqtt protocol.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqtt_protocol).
|
||||
|
||||
-include("emqtt.hrl").
|
||||
|
||||
-include("emqtt_packet.hrl").
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% API Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
%% API
|
||||
-export([init/3, client_id/1]).
|
||||
|
||||
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
|
||||
-export([received/2, send/2, redeliver/2, shutdown/2]).
|
||||
|
||||
-export([info/1]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% Protocol State
|
||||
%% ------------------------------------------------------------------
|
||||
-record(proto_state, {
|
||||
transport,
|
||||
socket,
|
||||
|
@ -51,71 +52,62 @@
|
|||
will_msg
|
||||
}).
|
||||
|
||||
-type(proto_state() :: #proto_state{}).
|
||||
|
||||
-spec(send_message({pid() | tuple(), mqtt_message()}, proto_state()) -> {ok, proto_state()}).
|
||||
|
||||
-spec(handle_packet(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
||||
|
||||
-define(PACKET_TYPE(Packet, Type),
|
||||
Packet = #mqtt_packet{header = #mqtt_packet_header { type = Type }}).
|
||||
|
||||
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
|
||||
-type proto_state() :: #proto_state{}.
|
||||
|
||||
init(Transport, Socket, Peername) ->
|
||||
#proto_state{
|
||||
transport = Transport,
|
||||
socket = Socket,
|
||||
peer_name = Peername
|
||||
}.
|
||||
transport = Transport,
|
||||
socket = Socket,
|
||||
peer_name = Peername}.
|
||||
|
||||
client_id(#proto_state { client_id = ClientId }) -> ClientId.
|
||||
client_id(#proto_state{client_id = ClientId}) -> ClientId.
|
||||
|
||||
%%SHOULD be registered in emqtt_cm
|
||||
info(#proto_state{ proto_vsn = ProtoVsn,
|
||||
proto_name = ProtoName,
|
||||
client_id = ClientId,
|
||||
clean_sess = CleanSess,
|
||||
will_msg = WillMsg }) ->
|
||||
[ {proto_vsn, ProtoVsn},
|
||||
{proto_name, ProtoName},
|
||||
{client_id, ClientId},
|
||||
{clean_sess, CleanSess},
|
||||
{will_msg, WillMsg} ].
|
||||
|
||||
info(#proto_state{proto_vsn = ProtoVsn,
|
||||
proto_name = ProtoName,
|
||||
client_id = ClientId,
|
||||
clean_sess = CleanSess,
|
||||
will_msg = WillMsg}) ->
|
||||
[{proto_vsn, ProtoVsn},
|
||||
{proto_name, ProtoName},
|
||||
{client_id, ClientId},
|
||||
{clean_sess, CleanSess},
|
||||
{will_msg, WillMsg}].
|
||||
|
||||
%%CONNECT – Client requests a connection to a Server
|
||||
|
||||
%%A Client can only send the CONNECT Packet once over a Network Connection.
|
||||
handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) ->
|
||||
handle_packet(?CONNECT, Packet, State#proto_state{connected = true});
|
||||
-spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
|
||||
received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
|
||||
handle(Packet, State#proto_state{connected = true});
|
||||
|
||||
handle_packet(?PACKET_TYPE(_Packet, ?CONNECT), State = #proto_state{connected = true}) ->
|
||||
received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
|
||||
{error, protocol_bad_connect, State};
|
||||
|
||||
%%Received other packets when CONNECT not arrived.
|
||||
handle_packet(_Packet, State = #proto_state{connected = false}) ->
|
||||
received(_Packet, State = #proto_state{connected = false}) ->
|
||||
{error, protocol_not_connected, State};
|
||||
|
||||
handle_packet(?PACKET_TYPE(Packet, Type),
|
||||
State = #proto_state { peer_name = PeerName, client_id = ClientId }) ->
|
||||
received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName,
|
||||
client_id = ClientId}) ->
|
||||
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
||||
case validate_packet(Packet) of
|
||||
ok ->
|
||||
handle_packet(Type, Packet, State);
|
||||
handle(Packet, State);
|
||||
{error, Reason} ->
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
||||
handle_packet(?CONNECT, Packet = #mqtt_packet {
|
||||
variable = #mqtt_packet_connect {
|
||||
username = Username,
|
||||
password = Password,
|
||||
clean_sess = CleanSess,
|
||||
keep_alive = KeepAlive,
|
||||
client_id = ClientId } = Var },
|
||||
State = #proto_state{ peer_name = PeerName } ) ->
|
||||
handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}) ->
|
||||
|
||||
#mqtt_packet_connect{username = Username,
|
||||
password = Password,
|
||||
clean_sess = CleanSess,
|
||||
keep_alive = KeepAlive,
|
||||
client_id = ClientId} = Var,
|
||||
|
||||
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
||||
|
||||
{ReturnCode1, State1} =
|
||||
case validate_connect(Var) of
|
||||
?CONNACK_ACCEPT ->
|
||||
|
@ -124,9 +116,9 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
|
|||
ClientId1 = clientid(ClientId, State),
|
||||
start_keepalive(KeepAlive),
|
||||
emqtt_cm:register(ClientId1, self()),
|
||||
{?CONNACK_ACCEPT, State#proto_state{ will_msg = willmsg(Var),
|
||||
clean_sess = CleanSess,
|
||||
client_id = ClientId1 }};
|
||||
{?CONNACK_ACCEPT, State#proto_state{will_msg = willmsg(Var),
|
||||
clean_sess = CleanSess,
|
||||
client_id = ClientId1}};
|
||||
false ->
|
||||
lager:error("~s@~s: username '~s' login failed - no credentials", [ClientId, PeerName, Username]),
|
||||
{?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}}
|
||||
|
@ -134,108 +126,72 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
|
|||
ReturnCode ->
|
||||
{ReturnCode, State#proto_state{client_id = ClientId}}
|
||||
end,
|
||||
send_packet( #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?CONNACK },
|
||||
variable = #mqtt_packet_connack{ return_code = ReturnCode1 }}, State1 ),
|
||||
send(?CONNACK_PACKET(ReturnCode1), State1),
|
||||
%%Starting session
|
||||
{ok, Session} = emqtt_session:start({CleanSess, ClientId, self()}),
|
||||
{ok, State1#proto_state { session = Session }};
|
||||
{ok, State1#proto_state{session = Session}};
|
||||
|
||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||
header = #mqtt_packet_header {qos = ?QOS_0}},
|
||||
State = #proto_state{session = Session}) ->
|
||||
handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
||||
State = #proto_state{session = Session}) ->
|
||||
emqtt_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)}),
|
||||
{ok, State};
|
||||
|
||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||
header = #mqtt_packet_header { qos = ?QOS_1 },
|
||||
variable = #mqtt_packet_publish{packet_id = PacketId }},
|
||||
State = #proto_state { session = Session }) ->
|
||||
handle(Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, PacketId, _Payload),
|
||||
State = #proto_state{session = Session}) ->
|
||||
emqtt_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}),
|
||||
send_packet( make_packet(?PUBACK, PacketId), State);
|
||||
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
||||
|
||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||
header = #mqtt_packet_header { qos = ?QOS_2 },
|
||||
variable = #mqtt_packet_publish { packet_id = PacketId } },
|
||||
State = #proto_state { session = Session }) ->
|
||||
handle(Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, PacketId, _Payload),
|
||||
State = #proto_state{session = Session}) ->
|
||||
NewSession = emqtt_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}),
|
||||
send_packet( make_packet(?PUBREC, PacketId), State#proto_state {session = NewSession} );
|
||||
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
|
||||
|
||||
handle_packet(Puback, #mqtt_packet{variable = ?PUBACK_PACKET(PacketId) },
|
||||
State = #proto_state { session = Session })
|
||||
when Puback >= ?PUBACK andalso Puback =< ?PUBCOMP ->
|
||||
|
||||
NewSession = emqtt_session:puback(Session, {Puback, PacketId}),
|
||||
NewState = State#proto_state {session = NewSession},
|
||||
handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
|
||||
when Type >= ?PUBACK andalso Type =< ?PUBCOMP ->
|
||||
NewSession = emqtt_session:puback(Session, {Type, PacketId}),
|
||||
NewState = State#proto_state{session = NewSession},
|
||||
if
|
||||
Puback =:= ?PUBREC ->
|
||||
send_packet( make_packet(?PUBREL, PacketId), NewState);
|
||||
Puback =:= ?PUBREL ->
|
||||
send_packet( make_packet(?PUBCOMP, PacketId), NewState);
|
||||
Type =:= ?PUBREC ->
|
||||
send(?PUBREL_PACKET(PacketId), NewState);
|
||||
Type =:= ?PUBREL ->
|
||||
send(?PUBACK_PACKET(?PUBCOMP, PacketId), NewState);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
{ok, NewState};
|
||||
|
||||
handle_packet(?SUBSCRIBE, #mqtt_packet {
|
||||
variable = #mqtt_packet_subscribe{
|
||||
packet_id = PacketId,
|
||||
topic_table = TopicTable},
|
||||
payload = undefined},
|
||||
State = #proto_state { session = Session } ) ->
|
||||
|
||||
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
|
||||
{ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, TopicTable),
|
||||
send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
||||
variable = #mqtt_packet_suback{ packet_id = PacketId,
|
||||
qos_table = GrantedQos }},
|
||||
State#proto_state{ session = NewSession });
|
||||
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession});
|
||||
|
||||
handle_packet(?UNSUBSCRIBE, #mqtt_packet{
|
||||
variable = #mqtt_packet_unsubscribe{
|
||||
packet_id = PacketId,
|
||||
topics = Topics},
|
||||
payload = undefined},
|
||||
State = #proto_state{session = Session}) ->
|
||||
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
||||
{ok, NewSession} = emqtt_session:unsubscribe(Session, Topics),
|
||||
send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
||||
variable = #mqtt_packet_unsuback{packet_id = PacketId }},
|
||||
State#proto_state { session = NewSession } );
|
||||
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
|
||||
|
||||
handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
|
||||
send_packet(make_packet(?PINGRESP), State);
|
||||
handle(?PACKET(?PINGREQ), State) ->
|
||||
send(?PACKET(?PINGRESP), State);
|
||||
|
||||
handle_packet(?DISCONNECT, #mqtt_packet{}, State) ->
|
||||
handle(?PACKET(?DISCONNECT), State) ->
|
||||
%%TODO: how to handle session?
|
||||
% clean willmsg
|
||||
{stop, normal, State#proto_state{will_msg = undefined}}.
|
||||
|
||||
make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
|
||||
#mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
|
||||
|
||||
make_packet(PubAck, PacketId) when PubAck >= ?PUBACK andalso PubAck =< ?PUBCOMP ->
|
||||
#mqtt_packet { header = #mqtt_packet_header { type = PubAck, qos = puback_qos(PubAck) },
|
||||
variable = #mqtt_packet_puback { packet_id = PacketId}}.
|
||||
|
||||
puback_qos(?PUBACK) -> ?QOS_0;
|
||||
puback_qos(?PUBREC) -> ?QOS_0;
|
||||
puback_qos(?PUBREL) -> ?QOS_1;
|
||||
puback_qos(?PUBCOMP) -> ?QOS_0.
|
||||
|
||||
-spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
||||
%% qos0 message
|
||||
send_message({_From, Message = #mqtt_message{ qos = ?QOS_0 }}, State) ->
|
||||
send_packet(emqtt_message:to_packet(Message), State);
|
||||
send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
|
||||
send(emqtt_message:to_packet(Message), State);
|
||||
|
||||
%% message from session
|
||||
send_message({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
|
||||
send_packet(emqtt_message:to_packet(Message), State);
|
||||
send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
|
||||
send(emqtt_message:to_packet(Message), State);
|
||||
|
||||
%% message(qos1, qos2) not from session
|
||||
send_message({_From, Message = #mqtt_message{ qos = Qos }}, State = #proto_state{ session = Session })
|
||||
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
|
||||
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
|
||||
{Message1, NewSession} = emqtt_session:store(Session, Message),
|
||||
send_packet(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}).
|
||||
send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
|
||||
|
||||
send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
||||
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
|
||||
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
||||
Data = emqtt_serialiser:serialise(Packet),
|
||||
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
|
||||
|
@ -246,7 +202,7 @@ send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, p
|
|||
%% @doc redeliver PUBREL PacketId
|
||||
%%
|
||||
redeliver({?PUBREL, PacketId}, State) ->
|
||||
send_packet( make_packet(?PUBREL, PacketId), State).
|
||||
send(?PUBREL_PACKET(PacketId), State).
|
||||
|
||||
shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) ->
|
||||
send_willmsg(WillMsg),
|
||||
|
@ -275,7 +231,7 @@ start_keepalive(Sec) when Sec > 0 ->
|
|||
%%----------------------------------------------------------------------------
|
||||
%% Validate Packets
|
||||
%%----------------------------------------------------------------------------
|
||||
validate_connect( Connect = #mqtt_packet_connect{} ) ->
|
||||
validate_connect(Connect = #mqtt_packet_connect{}) ->
|
||||
case validate_protocol(Connect) of
|
||||
true ->
|
||||
case validate_clientid(Connect) of
|
||||
|
@ -288,36 +244,36 @@ validate_connect( Connect = #mqtt_packet_connect{} ) ->
|
|||
?CONNACK_PROTO_VER
|
||||
end.
|
||||
|
||||
validate_protocol(#mqtt_packet_connect { proto_ver = Ver, proto_name = Name }) ->
|
||||
validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
|
||||
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
||||
|
||||
validate_clientid(#mqtt_packet_connect { client_id = ClientId })
|
||||
validate_clientid(#mqtt_packet_connect{client_id = ClientId})
|
||||
when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< ?MAX_CLIENTID_LEN ) ->
|
||||
true;
|
||||
|
||||
%% MQTT3.1.1 allow null clientId.
|
||||
validate_clientid(#mqtt_packet_connect { proto_ver =?MQTT_PROTO_V311, client_id = ClientId })
|
||||
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId})
|
||||
when size(ClientId) =:= 0 ->
|
||||
true;
|
||||
|
||||
validate_clientid(#mqtt_packet_connect { proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId }) ->
|
||||
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}) ->
|
||||
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
|
||||
false.
|
||||
|
||||
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH },
|
||||
variable = #mqtt_packet_publish{ topic_name = Topic }}) ->
|
||||
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
||||
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
||||
case emqtt_topic:validate({name, Topic}) of
|
||||
true -> ok;
|
||||
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
||||
end;
|
||||
|
||||
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBSCRIBE },
|
||||
variable = #mqtt_packet_subscribe{topic_table = Topics }}) ->
|
||||
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE},
|
||||
variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
||||
|
||||
validate_topics(filter, Topics);
|
||||
|
||||
validate_packet(#mqtt_packet{ header = #mqtt_packet_header { type = ?UNSUBSCRIBE },
|
||||
variable = #mqtt_packet_subscribe{ topic_table = Topics }}) ->
|
||||
validate_packet(#mqtt_packet{ header = #mqtt_packet_header{type = ?UNSUBSCRIBE},
|
||||
variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
||||
|
||||
validate_topics(filter, Topics);
|
||||
|
||||
|
|
|
@ -1,160 +0,0 @@
|
|||
%%-----------------------------------------------------------------------------
|
||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%
|
||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%% of this software and associated documentation files (the "Software"), to deal
|
||||
%% in the Software without restriction, including without limitation the rights
|
||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%% copies of the Software, and to permit persons to whom the Software is
|
||||
%% furnished to do so, subject to the following conditions:
|
||||
%%
|
||||
%% The above copyright notice and this permission notice shall be included in all
|
||||
%% copies or substantial portions of the Software.
|
||||
%%
|
||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%% SOFTWARE.
|
||||
%%------------------------------------------------------------------------------
|
||||
-module(emqtt_packet_tests).
|
||||
|
||||
-include("emqtt_packet.hrl").
|
||||
|
||||
-import(emqtt_packet, [initial_state/0, parse/2, serialise/1]).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
parse_connect_test() ->
|
||||
State = initial_state(),
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header { type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect { proto_ver = 3,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>}, parse(V31ConnBin, State)),
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header { type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect { proto_ver = 4,
|
||||
proto_name = <<"MQTT">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>}, parse(V311ConnBin, State)),
|
||||
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
||||
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header { type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect { proto_ver = 4,
|
||||
proto_name = <<"MQTT">>,
|
||||
client_id = <<>>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>}, parse(V311ConnWithoutClientId, State)),
|
||||
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header { type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect { proto_ver = 3,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
client_id = <<"mosqpub/10452-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60,
|
||||
will_retain = false,
|
||||
will_qos = 1,
|
||||
will_flag = true,
|
||||
will_topic = <<"/will">>,
|
||||
will_msg = <<"willmsg">> ,
|
||||
username = <<"test">>,
|
||||
password = <<"public">> } },
|
||||
<<>> }, parse(ConnBinWithWill, State)),
|
||||
ok.
|
||||
|
||||
parse_publish_test() ->
|
||||
State = initial_state(),
|
||||
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
||||
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?PUBLISH,
|
||||
dup = false,
|
||||
qos = 1,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_publish { topic_name = <<"a/b/c">>,
|
||||
packet_id = 1 },
|
||||
payload = <<"hahah">> }, <<>>}, parse(PubBin, State)),
|
||||
|
||||
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?PUBLISH,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_publish { topic_name = <<"xxx/yyy">>,
|
||||
packet_id = undefined },
|
||||
payload = <<"hello">> }, <<224,0>>}, parse(PubBin1, State)),
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header { type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}
|
||||
}, <<>>}, parse(<<224, 0>>, State)).
|
||||
|
||||
parse_puback_test() ->
|
||||
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||
PubAckBin = <<64,2,0,1>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?PUBACK,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false }
|
||||
}, <<>>}, parse(PubAckBin, initial_state())),
|
||||
ok.
|
||||
|
||||
parse_subscribe_test() ->
|
||||
ok.
|
||||
|
||||
parse_pingreq_test() ->
|
||||
ok.
|
||||
|
||||
parse_disconnect_test() ->
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
Bin = <<224, 0>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false }
|
||||
}, <<>>}, parse(Bin, initial_state())).
|
||||
|
||||
serialise_connack_test() ->
|
||||
ConnAck = #mqtt_packet{ header = #mqtt_packet_header { type = ?CONNACK },
|
||||
variable = #mqtt_packet_connack { ack_flags = 0, return_code = 0 } },
|
||||
?assertEqual(<<32,2,0,0>>, emqtt_packet:serialise(ConnAck)).
|
||||
|
||||
serialise_puback_test() ->
|
||||
ok.
|
||||
|
||||
-endif.
|
||||
|
|
@ -0,0 +1,155 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqtt_parser tests.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqtt_parser_tests).
|
||||
|
||||
-include("emqtt_packet.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
parse_connect_test() ->
|
||||
State = emqtt_parser:init(),
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect{proto_ver = 3,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60}}, <<>>}, emqtt_parser:parse(V31ConnBin, State)),
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect{proto_ver = 4,
|
||||
proto_name = <<"MQTT">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnBin, State)),
|
||||
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
||||
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect{proto_ver = 4,
|
||||
proto_name = <<"MQTT">>,
|
||||
client_id = <<>>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnWithoutClientId, State)),
|
||||
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_connect{proto_ver = 3,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
client_id = <<"mosqpub/10452-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60,
|
||||
will_retain = false,
|
||||
will_qos = 1,
|
||||
will_flag = true,
|
||||
will_topic = <<"/will">>,
|
||||
will_msg = <<"willmsg">> ,
|
||||
username = <<"test">>,
|
||||
password = <<"public">>}},
|
||||
<<>> }, emqtt_parser:parse(ConnBinWithWill, State)),
|
||||
ok.
|
||||
|
||||
parse_publish_test() ->
|
||||
State = emqtt_parser:init(),
|
||||
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
||||
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
dup = false,
|
||||
qos = 1,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
||||
packet_id = 1},
|
||||
payload = <<"hahah">> }, <<>>}, emqtt_parser:parse(PubBin, State)),
|
||||
|
||||
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false},
|
||||
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
||||
packet_id = undefined},
|
||||
payload = <<"hello">> }, <<224,0>>}, emqtt_parser:parse(PubBin1, State)),
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}
|
||||
}, <<>>}, emqtt_parser:parse(<<224, 0>>, State)).
|
||||
|
||||
parse_puback_test() ->
|
||||
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||
PubAckBin = <<64,2,0,1>>,
|
||||
?assertMatch({ok, #mqtt_packet {
|
||||
header = #mqtt_packet_header{type = ?PUBACK,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false }
|
||||
}, <<>>}, emqtt_parser:parse(PubAckBin, emqtt_parser:init())),
|
||||
ok.
|
||||
|
||||
parse_subscribe_test() ->
|
||||
ok.
|
||||
|
||||
parse_pingreq_test() ->
|
||||
ok.
|
||||
|
||||
parse_disconnect_test() ->
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
Bin = <<224, 0>>,
|
||||
?assertMatch({ok, #mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}
|
||||
}, <<>>}, emqtt_parser:parse(Bin, emqtt_parser:init())).
|
||||
|
||||
-endif.
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqtt_serialiser tests.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqtt_serialiser_tests).
|
||||
|
||||
-include("emqtt_packet.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
serialise_connack_test() ->
|
||||
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
||||
?assertEqual(<<32,2,0,0>>, emqtt_serialiser:serialise(ConnAck)).
|
||||
|
||||
serialise_puback_test() ->
|
||||
ok.
|
||||
|
||||
-endif.
|
||||
|
Loading…
Reference in New Issue