diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 53350ed6d..0d7d4c134 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ %% Banner %%-------------------------------------------------------------------- --define(COPYRIGHT, "Copyright (C) 2012-2017, Feng Lee "). +-define(COPYRIGHT, "Copyright (c) 2013-2017 EMQ Enterprise, Inc."). -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0"). @@ -48,21 +48,21 @@ %% MQTT Topic %%-------------------------------------------------------------------- --record(mqtt_topic, { - topic :: binary(), - flags = [] :: [retained | static] -}). +-record(mqtt_topic, + { topic :: binary(), + flags = [] :: [retained | static] + }). -type(mqtt_topic() :: #mqtt_topic{}). %%-------------------------------------------------------------------- %% MQTT Subscription %%-------------------------------------------------------------------- --record(mqtt_subscription, { - subid :: binary() | atom(), - topic :: binary(), - qos :: 0 | 1 | 2 -}). +-record(mqtt_subscription, + { subid :: binary() | atom(), + topic :: binary(), + qos :: 0 | 1 | 2 + }). -type(mqtt_subscription() :: #mqtt_subscription{}). @@ -73,18 +73,18 @@ -type(ws_header_key() :: atom() | binary() | string()). -type(ws_header_val() :: atom() | binary() | string() | integer()). --record(mqtt_client, { - client_id :: binary() | undefined, - client_pid :: pid(), - username :: binary() | undefined, - peername :: {inet:ip_address(), integer()}, - clean_sess :: boolean(), - proto_ver :: 3 | 4, - keepalive = 0, - will_topic :: undefined | binary(), - ws_initial_headers :: list({ws_header_key(), ws_header_val()}), - connected_at :: erlang:timestamp() -}). +-record(mqtt_client, + { client_id :: binary() | undefined, + client_pid :: pid(), + username :: binary() | undefined, + peername :: {inet:ip_address(), inet:port_number()}, + clean_sess :: boolean(), + proto_ver :: 3 | 4, + keepalive = 0, + will_topic :: undefined | binary(), + ws_initial_headers :: list({ws_header_key(), ws_header_val()}), + connected_at :: erlang:timestamp() + }). -type(mqtt_client() :: #mqtt_client{}). @@ -92,33 +92,46 @@ %% MQTT Session %%-------------------------------------------------------------------- --record(mqtt_session, { - client_id :: binary(), - sess_pid :: pid(), - persistent :: boolean() -}). +-record(mqtt_session, + { client_id :: binary(), + sess_pid :: pid(), + clean_sess :: boolean() + }). -type(mqtt_session() :: #mqtt_session{}). %%-------------------------------------------------------------------- %% MQTT Message %%-------------------------------------------------------------------- + -type(mqtt_msgid() :: binary() | undefined). + -type(mqtt_pktid() :: 1..16#ffff | undefined). --record(mqtt_message, { - id :: mqtt_msgid(), %% Global unique message ID - pktid :: mqtt_pktid(), %% PacketId - from :: {binary(), undefined | binary()}, %% ClientId and Username - topic :: binary(), %% Topic that the message is published to - qos = 0 :: 0 | 1 | 2, %% Message QoS - flags = [] :: [retain | dup | sys], %% Message Flags - retain = false :: boolean(), %% Retain flag - dup = false :: boolean(), %% Dup flag - sys = false :: boolean(), %% $SYS flag - headers = [] :: list(), - payload :: binary(), %% Payload - timestamp :: pos_integer() %% os:timestamp to seconds +-record(mqtt_message, + { %% Global unique message ID + id :: mqtt_msgid(), + %% PacketId + pktid :: mqtt_pktid(), + %% ClientId and Username + from :: {binary(), undefined | binary()}, + %% Topic that the message is published to + topic :: binary(), + %% Message QoS + qos = 0 :: 0 | 1 | 2, + %% Message Flags + flags = [] :: [retain | dup | sys], + %% Retain flag + retain = false :: boolean(), + %% Dup flag + dup = false :: boolean(), + %% $SYS flag + sys = false :: boolean(), + headers = [] :: list(), + %% Payload + payload :: binary(), + %% Timestamp + timestamp :: erlang:timestamp() }). -type(mqtt_message() :: #mqtt_message{}). @@ -126,46 +139,45 @@ %%-------------------------------------------------------------------- %% MQTT Delivery %%-------------------------------------------------------------------- --record(mqtt_delivery, { - sender :: pid(), %% Pid of the sender/publisher - message :: mqtt_message(), %% Message - flows :: list() -}). + +-record(mqtt_delivery, + { sender :: pid(), %% Pid of the sender/publisher + message :: mqtt_message(), %% Message + flows :: list() + }). -type(mqtt_delivery() :: #mqtt_delivery{}). %%-------------------------------------------------------------------- %% MQTT Route %%-------------------------------------------------------------------- --record(mqtt_route, { - topic :: binary(), - node :: node() -}). + +-record(mqtt_route, + { topic :: binary(), + node :: node() + }). -type(mqtt_route() :: #mqtt_route{}). %%-------------------------------------------------------------------- %% MQTT Alarm %%-------------------------------------------------------------------- --record(mqtt_alarm, { - id :: binary(), - severity :: warning | error | critical, - title :: iolist() | binary(), - summary :: iolist() | binary(), - timestamp :: erlang:timestamp() %% Timestamp -}). + +-record(mqtt_alarm, + { id :: binary(), + severity :: warning | error | critical, + title :: iolist() | binary(), + summary :: iolist() | binary(), + timestamp :: erlang:timestamp() %% Timestamp + }). -type(mqtt_alarm() :: #mqtt_alarm{}). %%-------------------------------------------------------------------- %% MQTT Plugin %%-------------------------------------------------------------------- --record(mqtt_plugin, { - name, - version, - descr, - active = false -}). + +-record(mqtt_plugin, { name, version, descr, active = false }). -type(mqtt_plugin() :: #mqtt_plugin{}). @@ -173,14 +185,8 @@ %% MQTT CLI Command %% For example: 'broker metrics' %%-------------------------------------------------------------------- --record(mqtt_cli, { - name, - action, - args = [], - opts = [], - usage, - descr -}). + +-record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }). -type(mqtt_cli() :: #mqtt_cli{}). diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl index 461306747..bda88d801 100644 --- a/include/emqttd_cli.hrl +++ b/include/emqttd_cli.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 06102e26a..ec5fb3e73 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 9ed3a993b..ab0650ead 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,23 +14,32 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT Protocol Header +%%-------------------------------------------------------------------- +%% MQTT SockOpts +%%-------------------------------------------------------------------- + +-define(MQTT_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, + {backlog, 512}, {nodelay, true}]). %%-------------------------------------------------------------------- %% MQTT Protocol Version and Levels %%-------------------------------------------------------------------- --define(MQTT_PROTO_V31, 3). --define(MQTT_PROTO_V311, 4). + +-define(MQTT_PROTO_V3, 3). +-define(MQTT_PROTO_V4, 4). +-define(MQTT_PROTO_V5, 5). -define(PROTOCOL_NAMES, [ - {?MQTT_PROTO_V31, <<"MQIsdp">>}, - {?MQTT_PROTO_V311, <<"MQTT">>}]). + {?MQTT_PROTO_V3, <<"MQIsdp">>}, + {?MQTT_PROTO_V4, <<"MQTT">>}, + {?MQTT_PROTO_V5, <<"MQTT">>}]). --type(mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311). +-type(mqtt_vsn() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). %%-------------------------------------------------------------------- -%% MQTT QoS +%% MQTT QoS Level %%-------------------------------------------------------------------- + -define(QOS_0, 0). %% At most once -define(QOS_1, 1). %% At least once -define(QOS_2, 2). %% Exactly once @@ -63,28 +72,30 @@ end). %%-------------------------------------------------------------------- -%% Max ClientId Length. Why 1024? NiDongDe... +%% Max ClientId Length. Why 1024? %%-------------------------------------------------------------------- + -define(MAX_CLIENTID_LEN, 1024). %%-------------------------------------------------------------------- %% MQTT Control Packet Types %%-------------------------------------------------------------------- --define(RESERVED, 0). %% Reserved --define(CONNECT, 1). %% Client request to connect to Server --define(CONNACK, 2). %% Server to Client: Connect acknowledgment --define(PUBLISH, 3). %% Publish message --define(PUBACK, 4). %% Publish acknowledgment --define(PUBREC, 5). %% Publish received (assured delivery part 1) --define(PUBREL, 6). %% Publish release (assured delivery part 2) --define(PUBCOMP, 7). %% Publish complete (assured delivery part 3) --define(SUBSCRIBE, 8). %% Client subscribe request --define(SUBACK, 9). %% Server Subscribe acknowledgment --define(UNSUBSCRIBE, 10). %% Unsubscribe request --define(UNSUBACK, 11). %% Unsubscribe acknowledgment --define(PINGREQ, 12). %% PING request --define(PINGRESP, 13). %% PING response --define(DISCONNECT, 14). %% Client is disconnecting +-define(RESERVED, 0). %% Reserved +-define(CONNECT, 1). %% Client request to connect to Server +-define(CONNACK, 2). %% Server to Client: Connect acknowledgment +-define(PUBLISH, 3). %% Publish message +-define(PUBACK, 4). %% Publish acknowledgment +-define(PUBREC, 5). %% Publish received (assured delivery part 1) +-define(PUBREL, 6). %% Publish release (assured delivery part 2) +-define(PUBCOMP, 7). %% Publish complete (assured delivery part 3) +-define(SUBSCRIBE, 8). %% Client subscribe request +-define(SUBACK, 9). %% Server Subscribe acknowledgment +-define(UNSUBSCRIBE, 10). %% Unsubscribe request +-define(UNSUBACK, 11). %% Unsubscribe acknowledgment +-define(PINGREQ, 12). %% PING request +-define(PINGRESP, 13). %% PING response +-define(DISCONNECT, 14). %% Client is disconnecting +-define(AUTH, 15). %% Authentication exchange -define(TYPE_NAMES, [ 'CONNECT', @@ -100,25 +111,28 @@ 'UNSUBACK', 'PINGREQ', 'PINGRESP', - 'DISCONNECT']). + 'DISCONNECT', + 'AUTH']). -type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT). %%-------------------------------------------------------------------- %% MQTT Connect Return Codes %%-------------------------------------------------------------------- --define(CONNACK_ACCEPT, 0). %% Connection accepted --define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version --define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server --define(CONNACK_SERVER, 3). %% Server unavailable --define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed --define(CONNACK_AUTH, 5). %% Client is not authorized to connect + +-define(CONNACK_ACCEPT, 0). %% Connection accepted +-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version +-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server +-define(CONNACK_SERVER, 3). %% Server unavailable +-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed +-define(CONNACK_AUTH, 5). %% Client is not authorized to connect -type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH). %%-------------------------------------------------------------------- %% MQTT Parser and Serializer %%-------------------------------------------------------------------- + -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -126,76 +140,87 @@ %%-------------------------------------------------------------------- %% MQTT Packet Fixed Header %%-------------------------------------------------------------------- + -record(mqtt_packet_header, { - type = ?RESERVED :: mqtt_packet_type(), - dup = false :: boolean(), - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean()}). + type = ?RESERVED :: mqtt_packet_type(), + dup = false :: boolean(), + qos = ?QOS_0 :: mqtt_qos(), + retain = false :: boolean()}). %%-------------------------------------------------------------------- %% MQTT Packets %%-------------------------------------------------------------------- --type(mqtt_client_id() :: binary()). --type(mqtt_username() :: binary() | undefined). --type(mqtt_packet_id() :: 1..16#ffff | undefined). --record(mqtt_packet_connect, { - client_id = <<>> :: mqtt_client_id(), - proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), - proto_name = <<"MQTT">> :: binary(), - will_retain = false :: boolean(), - will_qos = ?QOS_0 :: mqtt_qos(), - will_flag = false :: boolean(), - clean_sess = false :: boolean(), - keep_alive = 60 :: non_neg_integer(), - will_topic = undefined :: undefined | binary(), - will_msg = undefined :: undefined | binary(), - username = undefined :: undefined | binary(), - password = undefined :: undefined | binary()}). +-type(mqtt_client_id() :: binary()). +-type(mqtt_username() :: binary() | undefined). +-type(mqtt_packet_id() :: 1..16#ffff | undefined). --record(mqtt_packet_connack, { - ack_flags = ?RESERVED :: 0 | 1, - return_code :: mqtt_connack() }). +-record(mqtt_packet_connect, + { client_id = <<>> :: mqtt_client_id(), + proto_ver = ?MQTT_PROTO_V4 :: mqtt_vsn(), + proto_name = <<"MQTT">> :: binary(), + will_retain = false :: boolean(), + will_qos = ?QOS_0 :: mqtt_qos(), + will_flag = false :: boolean(), + clean_sess = false :: boolean(), + keep_alive = 60 :: non_neg_integer(), + will_topic = undefined :: undefined | binary(), + will_msg = undefined :: undefined | binary(), + username = undefined :: undefined | binary(), + password = undefined :: undefined | binary() + }). --record(mqtt_packet_publish, { - topic_name :: binary(), - packet_id :: mqtt_packet_id() }). +-record(mqtt_packet_connack, + { ack_flags = ?RESERVED :: 0 | 1, + return_code :: mqtt_connack() + }). --record(mqtt_packet_puback, { - packet_id :: mqtt_packet_id() }). +-record(mqtt_packet_publish, + { topic_name :: binary(), + packet_id :: mqtt_packet_id() + }). --record(mqtt_packet_subscribe, { - packet_id :: mqtt_packet_id(), - topic_table :: list({binary(), mqtt_qos()}) }). +-record(mqtt_packet_puback, + { packet_id :: mqtt_packet_id() }). --record(mqtt_packet_unsubscribe, { - packet_id :: mqtt_packet_id(), - topics :: list(binary()) }). +-record(mqtt_packet_subscribe, + { packet_id :: mqtt_packet_id(), + topic_table :: list({binary(), mqtt_qos()}) + }). --record(mqtt_packet_suback, { - packet_id :: mqtt_packet_id(), - qos_table :: list(mqtt_qos() | 128) }). +-record(mqtt_packet_unsubscribe, + { packet_id :: mqtt_packet_id(), + topics :: list(binary()) + }). --record(mqtt_packet_unsuback, { - packet_id :: mqtt_packet_id() }). +-record(mqtt_packet_suback, + { packet_id :: mqtt_packet_id(), + qos_table :: list(mqtt_qos() | 128) + }). + +-record(mqtt_packet_unsuback, + { packet_id :: mqtt_packet_id() }). %%-------------------------------------------------------------------- %% MQTT Control Packet %%-------------------------------------------------------------------- --record(mqtt_packet, { - header :: #mqtt_packet_header{}, - variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} - | #mqtt_packet_publish{} | #mqtt_packet_puback{} - | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} - | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} - | mqtt_packet_id() | undefined, - payload :: binary() | undefined }). --type mqtt_packet() :: #mqtt_packet{}. +-record(mqtt_packet, + { header :: #mqtt_packet_header{}, + variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} + | #mqtt_packet_publish{} | #mqtt_packet_puback{} + | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} + | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} + | mqtt_packet_id() | undefined, + payload :: binary() | undefined + }). + +-type(mqtt_packet() :: #mqtt_packet{}). %%-------------------------------------------------------------------- %% MQTT Packet Match %%-------------------------------------------------------------------- + -define(CONNECT_PACKET(Var), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}). diff --git a/include/emqttd_trie.hrl b/include/emqttd_trie.hrl index bd4184ad8..eb4e1390d 100644 --- a/include/emqttd_trie.hrl +++ b/include/emqttd_trie.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2016-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,20 +16,20 @@ -type(trie_node_id() :: binary() | atom()). --record(trie_node, { - node_id :: trie_node_id(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined, - flags :: [retained | static] -}). +-record(trie_node, + { node_id :: trie_node_id(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() | undefined, + flags :: [retained | static] + }). --record(trie_edge, { - node_id :: trie_node_id(), - word :: binary() | atom() -}). +-record(trie_edge, + { node_id :: trie_node_id(), + word :: binary() | atom() + }). --record(trie, { - edge :: #trie_edge{}, - node_id :: trie_node_id() -}). +-record(trie, + { edge :: #trie_edge{}, + node_id :: trie_node_id() + }).