merge emqttc code
This commit is contained in:
parent
e80a78b66e
commit
ebd056f60b
|
@ -19,6 +19,11 @@
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%% emqtt header.
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Banner
|
%% Banner
|
||||||
|
@ -31,16 +36,6 @@
|
||||||
|
|
||||||
-define(ERTS_MINIMUM, "6.0").
|
-define(ERTS_MINIMUM, "6.0").
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT QoS
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(QOS_0, 0).
|
|
||||||
-define(QOS_1, 1).
|
|
||||||
-define(QOS_2, 2).
|
|
||||||
|
|
||||||
-type mqtt_qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Client
|
%% MQTT Client
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -64,20 +59,6 @@
|
||||||
|
|
||||||
-type mqtt_session() :: #mqtt_session{}.
|
-type mqtt_session() :: #mqtt_session{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Message
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(mqtt_message, {
|
|
||||||
msgid :: integer() | undefined,
|
|
||||||
qos = ?QOS_0 :: mqtt_qos(),
|
|
||||||
retain = false :: boolean(),
|
|
||||||
dup = false :: boolean(),
|
|
||||||
topic :: binary(),
|
|
||||||
payload :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type mqtt_message() :: #mqtt_message{}.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT User Management
|
%% MQTT User Management
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -92,4 +73,3 @@
|
||||||
|
|
||||||
%%TODO: ClientId | Username --> Pub | Sub --> Topics
|
%%TODO: ClientId | Username --> Pub | Sub --> Topics
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,27 +1,30 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% Copyright (c) 2015, Feng Lee <feng@emqtt.io>
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
%%
|
%%%
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
%% in the Software without restriction, including without limitation the rights
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
%% furnished to do so, subject to the following conditions:
|
%%% furnished to do so, subject to the following conditions:
|
||||||
%%
|
%%%
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
%% copies or substantial portions of the Software.
|
%%% copies or substantial portions of the Software.
|
||||||
%%
|
%%%
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%
|
%%% @doc
|
||||||
%% The Original Code is from RabbitMQ.
|
%%% emqtt packet header.
|
||||||
%%
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Protocol Version and Levels
|
%% MQTT Protocol Version and Levels
|
||||||
|
@ -29,8 +32,27 @@
|
||||||
-define(MQTT_PROTO_V31, 3).
|
-define(MQTT_PROTO_V31, 3).
|
||||||
-define(MQTT_PROTO_V311, 4).
|
-define(MQTT_PROTO_V311, 4).
|
||||||
|
|
||||||
-define(PROTOCOL_NAMES, [{?MQTT_PROTO_V31, <<"MQIsdp">>}, {?MQTT_PROTO_V311, <<"MQTT">>}]).
|
-define(PROTOCOL_NAMES, [
|
||||||
|
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
||||||
|
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
||||||
|
|
||||||
|
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% QoS Levels
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(QOS_0, 0).
|
||||||
|
-define(QOS_1, 1).
|
||||||
|
-define(QOS_2, 2).
|
||||||
|
|
||||||
|
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).
|
||||||
|
|
||||||
|
-type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Max ClientId Length. Why 1024? NiDongDe!
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
-define(MAX_CLIENTID_LEN, 1024).
|
-define(MAX_CLIENTID_LEN, 1024).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -52,6 +74,24 @@
|
||||||
-define(PINGRESP, 13). %% PING response
|
-define(PINGRESP, 13). %% PING response
|
||||||
-define(DISCONNECT, 14). %% Client is disconnecting
|
-define(DISCONNECT, 14). %% Client is disconnecting
|
||||||
|
|
||||||
|
-define(TYPE_NAMES, [
|
||||||
|
'CONNECT',
|
||||||
|
'CONNACK',
|
||||||
|
'PUBLISH',
|
||||||
|
'PUBACK',
|
||||||
|
'PUBREC',
|
||||||
|
'PUBREL',
|
||||||
|
'PUBCOMP',
|
||||||
|
'SUBSCRIBE',
|
||||||
|
'SUBACK',
|
||||||
|
'UNSUBSCRIBE',
|
||||||
|
'UNSUBACK',
|
||||||
|
'PINGREQ',
|
||||||
|
'PINGRESP',
|
||||||
|
'DISCONNECT']).
|
||||||
|
|
||||||
|
-type mqtt_packet_type() :: ?RESERVED..?DISCONNECT.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Connect Return Codes
|
%% MQTT Connect Return Codes
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -62,13 +102,14 @@
|
||||||
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
|
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
|
||||||
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
|
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
-type mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH.
|
||||||
%% MQTT Erlang Types
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-type mqtt_packet_type() :: ?RESERVED..?DISCONNECT.
|
|
||||||
|
|
||||||
-type mqtt_packet_id() :: 1..16#ffff | undefined.
|
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Parser and Serialiser
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-define(MAX_LEN, 16#fffffff).
|
||||||
|
-define(HIGHBIT, 2#10000000).
|
||||||
|
-define(LOWBITS, 2#01111111).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Packet Fixed Header
|
%% MQTT Packet Fixed Header
|
||||||
|
@ -76,29 +117,31 @@
|
||||||
-record(mqtt_packet_header, {
|
-record(mqtt_packet_header, {
|
||||||
type = ?RESERVED :: mqtt_packet_type(),
|
type = ?RESERVED :: mqtt_packet_type(),
|
||||||
dup = false :: boolean(),
|
dup = false :: boolean(),
|
||||||
qos = 0 :: 0 | 1 | 2,
|
qos = ?QOS_0 :: mqtt_qos(),
|
||||||
retain = false :: boolean()}).
|
retain = false :: boolean()}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Packets
|
%% MQTT Packets
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
-type mqtt_packet_id() :: 1..16#ffff | undefined.
|
||||||
|
|
||||||
-record(mqtt_packet_connect, {
|
-record(mqtt_packet_connect, {
|
||||||
proto_ver,
|
client_id = <<>> :: binary(),
|
||||||
proto_name,
|
proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(),
|
||||||
will_retain,
|
proto_name = <<"MQTT">> :: binary(),
|
||||||
will_qos,
|
will_retain = false :: boolean(),
|
||||||
will_flag,
|
will_qos = ?QOS_0 :: mqtt_qos(),
|
||||||
clean_sess,
|
will_flag = false :: boolean(),
|
||||||
keep_alive,
|
clean_sess = false :: boolean(),
|
||||||
client_id,
|
keep_alive = 60 :: non_neg_integer(),
|
||||||
will_topic,
|
will_topic = undefined :: undefined | binary(),
|
||||||
will_msg,
|
will_msg = undefined :: undefined | binary(),
|
||||||
username,
|
username = undefined :: undefined | binary(),
|
||||||
password }).
|
password = undefined :: undefined | binary()}).
|
||||||
|
|
||||||
-record(mqtt_packet_connack, {
|
-record(mqtt_packet_connack, {
|
||||||
ack_flags = ?RESERVED,
|
ack_flags = ?RESERVED :: 0 | 1,
|
||||||
return_code }).
|
return_code :: mqtt_connack() }).
|
||||||
|
|
||||||
-record(mqtt_packet_publish, {
|
-record(mqtt_packet_publish, {
|
||||||
topic_name :: binary(),
|
topic_name :: binary(),
|
||||||
|
@ -107,17 +150,20 @@
|
||||||
-record(mqtt_packet_puback, {
|
-record(mqtt_packet_puback, {
|
||||||
packet_id :: mqtt_packet_id() }).
|
packet_id :: mqtt_packet_id() }).
|
||||||
|
|
||||||
-record(mqtt_topic, {
|
|
||||||
name :: binary(),
|
|
||||||
qos :: 0 | 1 | 2 }).
|
|
||||||
|
|
||||||
-record(mqtt_packet_subscribe, {
|
-record(mqtt_packet_subscribe, {
|
||||||
packet_id :: mqtt_packet_id(),
|
packet_id :: mqtt_packet_id(),
|
||||||
topic_table :: list(#mqtt_topic{}) }).
|
topic_table :: list({binary(), mqtt_qos()}) }).
|
||||||
|
|
||||||
|
-record(mqtt_packet_unsubscribe, {
|
||||||
|
packet_id :: mqtt_packet_id(),
|
||||||
|
topics :: list(binary()) }).
|
||||||
|
|
||||||
-record(mqtt_packet_suback, {
|
-record(mqtt_packet_suback, {
|
||||||
packet_id :: mqtt_packet_id(),
|
packet_id :: mqtt_packet_id(),
|
||||||
qos_table = [] }).
|
qos_table :: list(mqtt_qos() | 128) }).
|
||||||
|
|
||||||
|
-record(mqtt_packet_unsuback, {
|
||||||
|
packet_id :: mqtt_packet_id() }).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Control Packet
|
%% MQTT Control Packet
|
||||||
|
@ -127,9 +173,65 @@
|
||||||
variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
|
variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
|
||||||
| #mqtt_packet_publish{} | #mqtt_packet_puback{}
|
| #mqtt_packet_publish{} | #mqtt_packet_puback{}
|
||||||
| #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
|
| #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
|
||||||
| mqtt_packet_id(),
|
| #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
|
||||||
payload :: binary() }).
|
| mqtt_packet_id() | undefined,
|
||||||
|
payload :: binary() | undefined }).
|
||||||
|
|
||||||
-type mqtt_packet() :: #mqtt_packet{}.
|
-type mqtt_packet() :: #mqtt_packet{}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Packet Match
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-define(CONNECT_PACKET(Packet),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Packet}).
|
||||||
|
|
||||||
|
-define(CONNACK_PACKET(ReturnCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
|
variable = #mqtt_packet_connack{return_code = ReturnCode}}).
|
||||||
|
|
||||||
|
-define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
qos = Qos},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
|
packet_id = PacketId},
|
||||||
|
payload = Payload}).
|
||||||
|
|
||||||
|
-define(PUBACK_PACKET(Type, PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = Type},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId}}).
|
||||||
|
|
||||||
|
-define(PUBREL_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId}}).
|
||||||
|
|
||||||
|
-define(SUBSCRIBE_PACKET(PacketId, TopicTable),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
topic_table = TopicTable}}).
|
||||||
|
-define(SUBACK_PACKET(PacketId, QosTable),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
|
||||||
|
variable = #mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
qos_table = QosTable}}).
|
||||||
|
-define(UNSUBSCRIBE_PACKET(PacketId, Topics),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
topics = Topics}}).
|
||||||
|
-define(UNSUBACK_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
|
||||||
|
variable = #mqtt_packet_unsuback{packet_id = PacketId}}).
|
||||||
|
|
||||||
|
-define(PACKET(Type),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Message
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_message, {
|
||||||
|
qos = ?QOS_0 :: mqtt_qos(),
|
||||||
|
retain = false :: boolean(),
|
||||||
|
dup = false :: boolean(),
|
||||||
|
msgid :: mqtt_packet_id(),
|
||||||
|
topic :: binary(),
|
||||||
|
payload :: binary()}).
|
||||||
|
|
||||||
|
-type mqtt_message() :: #mqtt_message{}.
|
||||||
|
|
|
@ -72,7 +72,7 @@ init(SockArgs = {Transport, Sock, _SockFun}) ->
|
||||||
await_recv = false,
|
await_recv = false,
|
||||||
conn_state = running,
|
conn_state = running,
|
||||||
conserve = false,
|
conserve = false,
|
||||||
parse_state = emqtt_packet:initial_state(),
|
parse_state = emqtt_parser:new(),
|
||||||
proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}),
|
proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}),
|
||||||
gen_server:enter_loop(?MODULE, [], State, 10000).
|
gen_server:enter_loop(?MODULE, [], State, 10000).
|
||||||
|
|
||||||
|
@ -164,7 +164,7 @@ process_received_bytes(Bytes,
|
||||||
State = #state{ parse_state = ParseState,
|
State = #state{ parse_state = ParseState,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
conn_name = ConnStr }) ->
|
conn_name = ConnStr }) ->
|
||||||
case emqtt_packet:parse(Bytes, ParseState) of
|
case emqtt_parser:parse(Bytes, ParseState) of
|
||||||
{more, ParseState1} ->
|
{more, ParseState1} ->
|
||||||
{noreply,
|
{noreply,
|
||||||
control_throttle( State #state{ parse_state = ParseState1 }),
|
control_throttle( State #state{ parse_state = ParseState1 }),
|
||||||
|
@ -174,7 +174,7 @@ process_received_bytes(Bytes,
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
process_received_bytes(
|
process_received_bytes(
|
||||||
Rest,
|
Rest,
|
||||||
State#state{ parse_state = emqtt_packet:initial_state(),
|
State#state{ parse_state = emqtt_parser:new(),
|
||||||
proto_state = ProtoState1 });
|
proto_state = ProtoState1 });
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||||
|
|
|
@ -26,6 +26,8 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-import(proplists, [get_value/2, get_value/3]).
|
-import(proplists, [get_value/2, get_value/3]).
|
||||||
|
|
||||||
-export([handle/1]).
|
-export([handle/1]).
|
||||||
|
|
|
@ -35,12 +35,35 @@
|
||||||
|
|
||||||
-export([dump/1]).
|
-export([dump/1]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Protocol name of version.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec protocol_name(Ver) -> Name when
|
||||||
|
Ver :: mqtt_vsn(),
|
||||||
|
Name :: binary().
|
||||||
protocol_name(Ver) when Ver =:= ?MQTT_PROTO_V31; Ver =:= ?MQTT_PROTO_V311->
|
protocol_name(Ver) when Ver =:= ?MQTT_PROTO_V31; Ver =:= ?MQTT_PROTO_V311->
|
||||||
proplists:get_value(Ver, ?PROTOCOL_NAMES).
|
proplists:get_value(Ver, ?PROTOCOL_NAMES).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Name of MQTT packet type.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec type_name(mqtt_packet_type()) -> atom().
|
||||||
type_name(Type) when Type > ?RESERVED andalso Type =< ?DISCONNECT ->
|
type_name(Type) when Type > ?RESERVED andalso Type =< ?DISCONNECT ->
|
||||||
lists:nth(Type, ?TYPE_NAMES).
|
lists:nth(Type, ?TYPE_NAMES).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Connack Name.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec connack_name(mqtt_connack()) -> atom().
|
||||||
connack_name(?CONNACK_ACCEPT) -> 'CONNACK_ACCEPT';
|
connack_name(?CONNACK_ACCEPT) -> 'CONNACK_ACCEPT';
|
||||||
connack_name(?CONNACK_PROTO_VER) -> 'CONNACK_PROTO_VER';
|
connack_name(?CONNACK_PROTO_VER) -> 'CONNACK_PROTO_VER';
|
||||||
connack_name(?CONNACK_INVALID_ID ) -> 'CONNACK_INVALID_ID';
|
connack_name(?CONNACK_INVALID_ID ) -> 'CONNACK_INVALID_ID';
|
||||||
|
@ -48,6 +71,13 @@ connack_name(?CONNACK_SERVER) -> 'CONNACK_SERVER';
|
||||||
connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS';
|
connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS';
|
||||||
connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
|
connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Dump packet.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec dump(mqtt_packet()) -> iolist().
|
||||||
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||||
dump_header(Header, dump_variable(Variable, Payload)).
|
dump_header(Header, dump_variable(Variable, Payload)).
|
||||||
|
|
||||||
|
|
|
@ -195,22 +195,21 @@ handle_packet(?SUBSCRIBE, #mqtt_packet {
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
State = #proto_state { session = Session } ) ->
|
State = #proto_state { session = Session } ) ->
|
||||||
|
|
||||||
Topics = [{Name, Qos} || #mqtt_topic{name=Name, qos=Qos} <- TopicTable],
|
{ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, TopicTable),
|
||||||
{ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, Topics),
|
|
||||||
send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
||||||
variable = #mqtt_packet_suback{ packet_id = PacketId,
|
variable = #mqtt_packet_suback{ packet_id = PacketId,
|
||||||
qos_table = GrantedQos }},
|
qos_table = GrantedQos }},
|
||||||
State#proto_state{ session = NewSession });
|
State#proto_state{ session = NewSession });
|
||||||
|
|
||||||
handle_packet(?UNSUBSCRIBE, #mqtt_packet{
|
handle_packet(?UNSUBSCRIBE, #mqtt_packet{
|
||||||
variable = #mqtt_packet_subscribe{
|
variable = #mqtt_packet_unsubscribe{
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
topic_table = Topics },
|
topics = Topics},
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
State = #proto_state{session = Session}) ->
|
State = #proto_state{session = Session}) ->
|
||||||
{ok, NewSession} = emqtt_session:unsubscribe(Session, [Name || #mqtt_topic{ name = Name } <- Topics]),
|
{ok, NewSession} = emqtt_session:unsubscribe(Session, Topics),
|
||||||
send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
||||||
variable = #mqtt_packet_suback{packet_id = PacketId }},
|
variable = #mqtt_packet_unsuback{packet_id = PacketId }},
|
||||||
State#proto_state { session = NewSession } );
|
State#proto_state { session = NewSession } );
|
||||||
|
|
||||||
handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
|
handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
|
||||||
|
@ -249,7 +248,7 @@ send_message({_From, Message = #mqtt_message{ qos = Qos }}, State = #proto_state
|
||||||
|
|
||||||
send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
||||||
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
||||||
Data = emqtt_packet:serialise(Packet),
|
Data = emqtt_serialiser:serialise(Packet),
|
||||||
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
|
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
|
||||||
Transport:send(Sock, Data),
|
Transport:send(Sock, Data),
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -341,7 +340,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
|
||||||
{error, empty_topics};
|
{error, empty_topics};
|
||||||
|
|
||||||
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
||||||
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
||||||
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
|
|
|
@ -28,6 +28,8 @@
|
||||||
|
|
||||||
-include("emqtt_topic.hrl").
|
-include("emqtt_topic.hrl").
|
||||||
|
|
||||||
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-module(emqtt_queue).
|
-module(emqtt_queue).
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-export([new/1, new/2, in/3, all/1, clear/1]).
|
-export([new/1, new/2, in/3, all/1, clear/1]).
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,6 @@
|
||||||
-export([serialise/1]).
|
-export([serialise/1]).
|
||||||
|
|
||||||
|
|
||||||
%%TODO: doc and spec...
|
|
||||||
|
|
||||||
serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
serialise(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
||||||
variable = Variable,
|
variable = Variable,
|
||||||
payload = Payload}) ->
|
payload = Payload}) ->
|
||||||
|
@ -100,7 +98,7 @@ serialise_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
|
||||||
serialise_variable(?SUBACK, #mqtt_packet_suback {packet_id = PacketId,
|
serialise_variable(?SUBACK, #mqtt_packet_suback {packet_id = PacketId,
|
||||||
qos_table = QosTable},
|
qos_table = QosTable},
|
||||||
undefined)
|
undefined) ->
|
||||||
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
|
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
|
||||||
|
|
||||||
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{
|
serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{
|
||||||
|
@ -108,7 +106,7 @@ serialise_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{
|
||||||
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
{<<PacketId:16/big>>, serialise_topics(Topics)};
|
||||||
|
|
||||||
serialise_variable(?UNSUBACK, #mqtt_packet_suback {packet_id = PacketId},
|
serialise_variable(?UNSUBACK, #mqtt_packet_suback {packet_id = PacketId},
|
||||||
undefined)
|
undefined) ->
|
||||||
{<<PacketId:16/big>>, <<>>};
|
{<<PacketId:16/big>>, <<>>};
|
||||||
|
|
||||||
serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName,
|
serialise_variable(?PUBLISH, #mqtt_packet_publish { topic_name = TopicName,
|
||||||
|
@ -157,4 +155,3 @@ opt(false) -> 0;
|
||||||
opt(true) -> 1;
|
opt(true) -> 1;
|
||||||
opt(X) when is_integer(X) -> X;
|
opt(X) when is_integer(X) -> X;
|
||||||
opt(B) when is_binary(B) -> 1.
|
opt(B) when is_binary(B) -> 1.
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqtt_server).
|
-module(emqtt_server).
|
||||||
|
|
||||||
-author('feng@slimpp.io').
|
-author('feng@slimpp.io').
|
||||||
|
@ -28,6 +27,8 @@
|
||||||
|
|
||||||
-include("emqtt_topic.hrl").
|
-include("emqtt_topic.hrl").
|
||||||
|
|
||||||
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
Loading…
Reference in New Issue