From 53572b7d376a70b28ce61e7a91328a51b6fb0afb Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 24 Feb 2017 20:38:35 +0800 Subject: [PATCH 01/16] Fix the comments of mqtt.session.* configurations --- etc/emq.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 5f43eff51..196ea99f3 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -130,14 +130,14 @@ mqtt.client.enable_stats = off ## Upgrade QoS? mqtt.session.upgrade_qos = off -## Max number of QoS 1 and 2 messages that can be “inflight” at one time. +## Max Size of the Inflight Window for QoS1 and QoS2 messages ## 0 means no limit mqtt.session.max_inflight = 32 ## Retry Interval for redelivering QoS1/2 messages. mqtt.session.retry_interval = 20s -## Max Packets that Awaiting PUBREL, 0 means no limit +## Client -> Broker: Max Packets Awaiting PUBREL, 0 means no limit mqtt.session.max_awaiting_rel = 100 ## Awaiting PUBREL Timeout From 3d0bc15e93cfb7ed943472a2335a9de6bde3d55c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:06:15 +0800 Subject: [PATCH 02/16] Use the '?assertEqual' macro and update Copyright info --- test/emqttd_SUITE.erl | 2 +- test/emqttd_access_SUITE.erl | 2 +- test/emqttd_acl_test_mod.erl | 2 +- test/emqttd_auth_anonymous_test_mod.erl | 2 +- test/emqttd_auth_dashboard.erl | 2 +- test/emqttd_inflight_SUITE.erl | 1 + test/emqttd_lib_SUITE.erl | 2 +- test/emqttd_mod_SUITE.erl | 2 +- test/emqttd_mqueue_SUITE.erl | 2 +- test/emqttd_net_SUITE.erl | 2 +- test/emqttd_protocol_SUITE.erl | 51 ++++++++++++------------- test/emqttd_topic_SUITE.erl | 2 +- test/emqttd_trie_SUITE.erl | 2 +- test/emqttd_vm_SUITE.erl | 2 +- 14 files changed, 38 insertions(+), 38 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index afa1a1f06..0ffef33ec 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index 5ab7e992b..762ae6f40 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_acl_test_mod.erl b/test/emqttd_acl_test_mod.erl index 196337fa4..08f1f9c94 100644 --- a/test/emqttd_acl_test_mod.erl +++ b/test/emqttd_acl_test_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_auth_anonymous_test_mod.erl b/test/emqttd_auth_anonymous_test_mod.erl index 8e93be0bc..be6a14bf8 100644 --- a/test/emqttd_auth_anonymous_test_mod.erl +++ b/test/emqttd_auth_anonymous_test_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_auth_dashboard.erl b/test/emqttd_auth_dashboard.erl index 0e509c08a..49f54c377 100644 --- a/test/emqttd_auth_dashboard.erl +++ b/test/emqttd_auth_dashboard.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_inflight_SUITE.erl b/test/emqttd_inflight_SUITE.erl index 5a87d056d..de5391f1a 100644 --- a/test/emqttd_inflight_SUITE.erl +++ b/test/emqttd_inflight_SUITE.erl @@ -48,3 +48,4 @@ t_is_full(_) -> t_is_empty(_) -> Inflight = ((emqttd_inflight:new(1)):insert(k, v1)), ?assertNot(Inflight:is_empty()). + diff --git a/test/emqttd_lib_SUITE.erl b/test/emqttd_lib_SUITE.erl index c16858785..344e185d0 100644 --- a/test/emqttd_lib_SUITE.erl +++ b/test/emqttd_lib_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_mod_SUITE.erl b/test/emqttd_mod_SUITE.erl index a258eabe0..1fcf455d0 100644 --- a/test/emqttd_mod_SUITE.erl +++ b/test/emqttd_mod_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2016-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_mqueue_SUITE.erl b/test/emqttd_mqueue_SUITE.erl index eaa0ecc68..e56b08398 100644 --- a/test/emqttd_mqueue_SUITE.erl +++ b/test/emqttd_mqueue_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_net_SUITE.erl b/test/emqttd_net_SUITE.erl index 7d70f4291..e3ae0700c 100644 --- a/test/emqttd_net_SUITE.erl +++ b/test/emqttd_net_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2016-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_protocol_SUITE.erl b/test/emqttd_protocol_SUITE.erl index 027d550a0..1ac688650 100644 --- a/test/emqttd_protocol_SUITE.erl +++ b/test/emqttd_protocol_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -260,7 +260,7 @@ serialize_connect(_) -> serialize_connack(_) -> ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}}, - <<32,2,0,0>> = iolist_to_binary(serialize(ConnAck)). + ?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(ConnAck))). serialize_publish(_) -> serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), @@ -303,20 +303,20 @@ long_payload() -> %%-------------------------------------------------------------------- packet_proto_name(_) -> - <<"MQIsdp">> = emqttd_packet:protocol_name(3), - <<"MQTT">> = emqttd_packet:protocol_name(4). + ?assertEqual(<<"MQIsdp">>, emqttd_packet:protocol_name(3)), + ?assertEqual(<<"MQTT">>, emqttd_packet:protocol_name(4)). packet_type_name(_) -> - 'CONNECT' = emqttd_packet:type_name(?CONNECT), - 'UNSUBSCRIBE' = emqttd_packet:type_name(?UNSUBSCRIBE). + ?assertEqual('CONNECT', emqttd_packet:type_name(?CONNECT)), + ?assertEqual('UNSUBSCRIBE', emqttd_packet:type_name(?UNSUBSCRIBE)). packet_connack_name(_) -> - 'CONNACK_ACCEPT' = emqttd_packet:connack_name(?CONNACK_ACCEPT), - 'CONNACK_PROTO_VER' = emqttd_packet:connack_name(?CONNACK_PROTO_VER), - 'CONNACK_INVALID_ID' = emqttd_packet:connack_name(?CONNACK_INVALID_ID), - 'CONNACK_SERVER' = emqttd_packet:connack_name(?CONNACK_SERVER), - 'CONNACK_CREDENTIALS' = emqttd_packet:connack_name(?CONNACK_CREDENTIALS), - 'CONNACK_AUTH' = emqttd_packet:connack_name(?CONNACK_AUTH). + ?assertEqual('CONNACK_ACCEPT', emqttd_packet:connack_name(?CONNACK_ACCEPT)), + ?assertEqual('CONNACK_PROTO_VER', emqttd_packet:connack_name(?CONNACK_PROTO_VER)), + ?assertEqual('CONNACK_INVALID_ID', emqttd_packet:connack_name(?CONNACK_INVALID_ID)), + ?assertEqual('CONNACK_SERVER', emqttd_packet:connack_name(?CONNACK_SERVER)), + ?assertEqual('CONNACK_CREDENTIALS', emqttd_packet:connack_name(?CONNACK_CREDENTIALS)), + ?assertEqual('CONNACK_AUTH', emqttd_packet:connack_name(?CONNACK_AUTH)). packet_format(_) -> io:format("~s", [emqttd_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), @@ -336,26 +336,25 @@ packet_format(_) -> message_make(_) -> Msg = emqttd_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - 0 = Msg#mqtt_message.qos, + ?assertEqual(0, Msg#mqtt_message.qos), Msg1 = emqttd_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), - true = is_binary(Msg1#mqtt_message.id), - 2 = Msg1#mqtt_message.qos. + ?assert(is_binary(Msg1#mqtt_message.id)), + ?assertEqual(2, Msg1#mqtt_message.qos). message_from_packet(_) -> Msg = emqttd_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), - 1 = Msg#mqtt_message.qos, - 10 = Msg#mqtt_message.pktid, - <<"topic">> = Msg#mqtt_message.topic, - + ?assertEqual(1, Msg#mqtt_message.qos), + ?assertEqual(10, Msg#mqtt_message.packet_id), + ?assertEqual(<<"topic">>, Msg#mqtt_message.topic), WillMsg = emqttd_message:from_packet(#mqtt_packet_connect{will_flag = true, will_topic = <<"WillTopic">>, will_msg = <<"WillMsg">>}), - <<"WillTopic">> = WillMsg#mqtt_message.topic, - <<"WillMsg">> = WillMsg#mqtt_message.payload, + ?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic), + ?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload), Msg2 = emqttd_message:from_packet(<<"username">>, <<"clientid">>, ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), - {<<"clientid">>, <<"username">>} = Msg2#mqtt_message.from, + ?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from), io:format("~s", [emqttd_message:format(Msg2)]). message_flag(_) -> @@ -363,13 +362,13 @@ message_flag(_) -> Msg2 = emqttd_message:from_packet(<<"clientid">>, Pkt), Msg3 = emqttd_message:set_flag(retain, Msg2), Msg4 = emqttd_message:set_flag(dup, Msg3), - true = Msg4#mqtt_message.dup, - true = Msg4#mqtt_message.retain, + ?assert(Msg4#mqtt_message.dup), + ?assert(Msg4#mqtt_message.retain), Msg5 = emqttd_message:set_flag(Msg4), Msg6 = emqttd_message:unset_flag(dup, Msg5), Msg7 = emqttd_message:unset_flag(retain, Msg6), - false = Msg7#mqtt_message.dup, - false = Msg7#mqtt_message.retain, + ?assertNot(Msg7#mqtt_message.dup), + ?assertNot(Msg7#mqtt_message.retain), emqttd_message:unset_flag(Msg7), emqttd_message:to_packet(Msg7). diff --git a/test/emqttd_topic_SUITE.erl b/test/emqttd_topic_SUITE.erl index f8be7df1a..a43875fba 100644 --- a/test/emqttd_topic_SUITE.erl +++ b/test/emqttd_topic_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2016-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_trie_SUITE.erl b/test/emqttd_trie_SUITE.erl index 37d247755..2394a902a 100644 --- a/test/emqttd_trie_SUITE.erl +++ b/test/emqttd_trie_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/test/emqttd_vm_SUITE.erl b/test/emqttd_vm_SUITE.erl index ba56ef97f..ef0ac2946 100644 --- a/test/emqttd_vm_SUITE.erl +++ b/test/emqttd_vm_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. From 98e15ebbf4e3799d7ef9c6126e9ea7eb4df11f26 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:11:09 +0800 Subject: [PATCH 03/16] Prepare for MQTT/5.0 --- include/emqttd.hrl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index a65086858..a02c86129 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -22,9 +22,9 @@ -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0"). --define(PROTOCOL_VERSION, "MQTT/3.1.1"). +-define(PROTOCOL_VERSION, "MQTT/5.0"). --define(ERTS_MINIMUM, "7.0"). +-define(ERTS_MINIMUM, "8.0"). %%-------------------------------------------------------------------- %% Sys/Queue/Share Topics' Prefix @@ -42,7 +42,7 @@ -type(pubsub() :: publish | subscribe). --define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). +-define(PS(PS), (PS =:= publish orelse PS =:= subscribe)). %%-------------------------------------------------------------------- %% MQTT Topic @@ -172,7 +172,7 @@ severity :: warning | error | critical, title :: iolist() | binary(), summary :: iolist() | binary(), - timestamp :: erlang:timestamp() %% Timestamp + timestamp :: erlang:timestamp() }). -type(mqtt_alarm() :: #mqtt_alarm{}). @@ -186,8 +186,7 @@ -type(mqtt_plugin() :: #mqtt_plugin{}). %%-------------------------------------------------------------------- -%% MQTT CLI Command -%% For example: 'broker metrics' +%% MQTT CLI Command. For example: 'broker metrics' %%-------------------------------------------------------------------- -record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }). From c3919a64e60582ed4a7b024fe1380fdd2ab7b476 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:13:07 +0800 Subject: [PATCH 04/16] Format 'trie_node' record --- include/emqttd_trie.hrl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/emqttd_trie.hrl b/include/emqttd_trie.hrl index eb4e1390d..c5b9d03c7 100644 --- a/include/emqttd_trie.hrl +++ b/include/emqttd_trie.hrl @@ -17,10 +17,10 @@ -type(trie_node_id() :: binary() | atom()). -record(trie_node, - { node_id :: trie_node_id(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined, - flags :: [retained | static] + { node_id :: trie_node_id(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() | undefined, + flags :: [retained | static] }). -record(trie_edge, From 35e08df735a9d78af2fe303bfce55da186d66870 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:17:07 +0800 Subject: [PATCH 05/16] Change the default QoS of will message to QOS_1 --- include/emqttd_protocol.hrl | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 6181b9a65..9d9cec714 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -80,6 +80,7 @@ %%-------------------------------------------------------------------- %% MQTT Control Packet Types %%-------------------------------------------------------------------- + -define(RESERVED, 0). %% Reserved -define(CONNECT, 1). %% Client request to connect to Server -define(CONNACK, 2). %% Server to Client: Connect acknowledgment @@ -94,7 +95,7 @@ -define(UNSUBACK, 11). %% Unsubscribe acknowledgment -define(PINGREQ, 12). %% PING request -define(PINGRESP, 13). %% PING response --define(DISCONNECT, 14). %% Client is disconnecting +-define(DISCONNECT, 14). %% Client or Server is disconnecting -define(AUTH, 15). %% Authentication exchange -define(TYPE_NAMES, [ @@ -146,11 +147,12 @@ %% MQTT Packet Fixed Header %%-------------------------------------------------------------------- --record(mqtt_packet_header, { - type = ?RESERVED :: mqtt_packet_type(), - dup = false :: boolean(), - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean()}). +-record(mqtt_packet_header, + { type = ?RESERVED :: mqtt_packet_type(), + dup = false :: boolean(), + qos = ?QOS_0 :: mqtt_qos(), + retain = false :: boolean() + }). %%-------------------------------------------------------------------- %% MQTT Packets @@ -165,7 +167,7 @@ proto_ver = ?MQTT_PROTO_V4 :: mqtt_vsn(), proto_name = <<"MQTT">> :: binary(), will_retain = false :: boolean(), - will_qos = ?QOS_0 :: mqtt_qos(), + will_qos = ?QOS_1 :: mqtt_qos(), will_flag = false :: boolean(), clean_sess = false :: boolean(), keep_alive = 60 :: non_neg_integer(), @@ -199,25 +201,25 @@ }). -record(mqtt_packet_suback, - { packet_id :: mqtt_packet_id(), - qos_table :: list(mqtt_qos() | 128) + { packet_id :: mqtt_packet_id(), + qos_table :: list(mqtt_qos() | 128) }). -record(mqtt_packet_unsuback, - { packet_id :: mqtt_packet_id() }). + { packet_id :: mqtt_packet_id() }). %%-------------------------------------------------------------------- %% MQTT Control Packet %%-------------------------------------------------------------------- -record(mqtt_packet, - { header :: #mqtt_packet_header{}, - variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} - | #mqtt_packet_publish{} | #mqtt_packet_puback{} - | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} - | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} - | mqtt_packet_id() | undefined, - payload :: binary() | undefined + { header :: #mqtt_packet_header{}, + variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} + | #mqtt_packet_publish{} | #mqtt_packet_puback{} + | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} + | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} + | mqtt_packet_id() | undefined, + payload :: binary() | undefined }). -type(mqtt_packet() :: #mqtt_packet{}). From 99c83dbe21138f20a4b969513dbacb6a4134afc0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:28:16 +0800 Subject: [PATCH 06/16] Add 'mqtt.listener.tcp.tune_buffer' config --- priv/emq.schema | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/priv/emq.schema b/priv/emq.schema index 92ca118d2..b5f40b893 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -132,14 +132,14 @@ end}. %% @doc http://www.erlang.org/doc/man/kernel_app.html {mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ - {commented, 6000}, + {commented, 6369}, {datatype, integer}, hidden ]}. %% @see node.dist_listen_min {mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ - {commented, 6999}, + {commented, 6369}, {datatype, integer}, hidden ]}. @@ -356,6 +356,7 @@ end}. {default, off}, {datatype, flag} ]}. + %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. %% 0 means no limit {mapping, "mqtt.session.max_inflight", "emqttd.session", [ @@ -571,6 +572,11 @@ end}. hidden ]}. +{mapping, "mqtt.listener.tcp.tune_buffer", "emqttd.listeners", [ + {default, off}, + {datatype, flag} +]}. + {mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ {datatype, {enum, [true, false]}}, hidden @@ -684,8 +690,8 @@ end}. LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) - end, + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}]) + end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, From dae3d22bef2dd3eb6160b8aebefce0dfbfe1930d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:43:44 +0800 Subject: [PATCH 07/16] Remove the io:format line --- src/emqttd_sm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 02dfdb879..bbba50d6a 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -107,7 +107,7 @@ dispatch(ClientId, Topic, Msg) -> try ets:lookup_element(mqtt_local_session, ClientId, 2) of Pid -> Pid ! {dispatch, Topic, Msg} catch - error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How?? + error:badarg -> ok %%FIXME Later. end. call(SM, Req) -> From f2a818a4a4823e3720295a22adefa53e11349328 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:46:09 +0800 Subject: [PATCH 08/16] Rename the PUBSUB macro to PS --- src/emqttd_access_control.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index cc60a57f2..65d0c76f5 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -70,7 +70,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary()). -check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) -> +check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> case lookup_mods(acl) of [] -> case emqttd:env(allow_anonymous, false) of true -> allow; From 14d28d59bd957382b46f5e1d46fdcb8fb5b0c370 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:47:50 +0800 Subject: [PATCH 09/16] Rename the 'Timestamp' variable to 'TS' --- src/emqttd_alarm.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index f968db30e..1467797c7 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -87,14 +87,14 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, severity = Severity, title = Title, summary = Summary}}, Alarms)-> - Timestamp = os:timestamp(), + TS = os:timestamp(), Json = mochijson2:encode([{id, AlarmId}, {severity, Severity}, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, - {ts, emqttd_time:now_secs(Timestamp)}]), + {ts, emqttd_time:now_secs(TS)}]), emqttd:publish(alarm_msg(alert, AlarmId, Json)), - {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]}; + {ok, [Alarm#mqtt_alarm{timestamp = TS} | Alarms]}; handle_event({clear_alarm, AlarmId}, Alarms) -> Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_time:now_secs()}]), From 7c90e08f57f53a948d22ef4154755cd4822ae587 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:48:29 +0800 Subject: [PATCH 10/16] Fix the subscrptions print --- src/emqttd_cli.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index cb4438cc8..54d462e65 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -537,20 +537,21 @@ print({Topic, Node}) -> ?PRINT("~s -> ~s~n", [Topic, Node]); print({ClientId, _ClientPid, _Persistent, SessInfo}) -> + Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)), InfoKeys = [clean_sess, + subscriptions, max_inflight, - inflight_queue, - message_queue, - message_dropped, - awaiting_rel, - awaiting_ack, - awaiting_comp, + inflight_len, + mqueue_len, + mqueue_dropped, + awaiting_rel_len, + deliver_msg, + enqueue_msg, created_at], - ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " - "message_queue=~w, message_dropped=~w, " - "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " - "created_at=~w)~n", - [ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]). + ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight=~w, " + "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, " + "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n", + [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]). print(subscription, {Sub, Topic}) when is_pid(Sub) -> ?PRINT("~p -> ~s~n", [Sub, Topic]); From e008d149d3900940dfb07807bf85b63fdfb14f63 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 6 Mar 2017 18:57:29 +0800 Subject: [PATCH 11/16] Update comments and misc fix --- src/emqttd_broker.erl | 2 +- src/emqttd_client.erl | 2 +- src/emqttd_serializer.erl | 2 +- src/emqttd_session.erl | 8 ++++---- src/emqttd_ws.erl | 4 ++-- src/emqttd_ws_client.erl | 5 +++-- src/emqttd_ws_client_sup.erl | 2 +- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index ce2e4210c..4f8ade17d 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -113,7 +113,7 @@ stop_tick(TRef) -> timer:cancel(TRef). %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- init([]) -> diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index e3e68b8e4..70a9729fa 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -291,7 +291,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -%% Receive and parse tcp data +%% Receive and Parse TCP Data received(<<>>, State) -> {noreply, gc(State), hibernate}; diff --git a/src/emqttd_serializer.erl b/src/emqttd_serializer.erl index a47a23c8b..079cfbb3c 100644 --- a/src/emqttd_serializer.erl +++ b/src/emqttd_serializer.erl @@ -28,7 +28,7 @@ %% @doc Serialise MQTT Packet -spec(serialize(mqtt_packet()) -> iolist()). -serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type}, +serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type}, variable = Variable, payload = Payload}) -> serialize_header(Header, diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 288565a3a..85b027781 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -129,12 +129,12 @@ %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), - %% Awaiting PUBREL timeout - await_rel_timeout = 20000 :: timeout(), - %% Max Packets that Awaiting PUBREL max_awaiting_rel = 100 :: non_neg_integer(), + %% Awaiting PUBREL timeout + await_rel_timeout = 20000 :: timeout(), + %% Awaiting PUBREL timer await_rel_timer :: reference(), @@ -580,7 +580,7 @@ code_change(_OldVsn, Session, _Extra) -> {ok, Session}. %%-------------------------------------------------------------------- -%% Kick old client +%% Kickout old client %%-------------------------------------------------------------------- kick(_ClientId, undefined, _Pid) -> ignore; diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index b292c39bc..c6a68150d 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -37,8 +37,8 @@ %% @doc Handle WebSocket Request. handle_request(Req) -> - {ok, Env} = emqttd:env(protocol), - PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), + {ok, ProtoEnv} = emqttd:env(protocol), + PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), Parser = emqttd_parser:initial_state(PacketSize), %% Upgrade WebSocket. {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index bde322dcf..c89fae7e4 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -92,14 +92,15 @@ init([Env, WsPid, Req, ReplyChannel]) -> {ok, Peername} = Req:get(peername), Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), + Conn = Req:get(connection), ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel), [{ws_initial_headers, Headers} | Env]), IdleTimeout = get_value(client_idle_timeout, Env, 30000), EnableStats = get_value(client_enable_stats, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), - {ok, #wsclient_state{ws_pid = WsPid, + {ok, #wsclient_state{connection = Conn, + ws_pid = WsPid, peername = Peername, - connection = Req:get(connection), proto_state = ProtoState, enable_stats = EnableStats, force_gc_count = ForceGcCount}, diff --git a/src/emqttd_ws_client_sup.erl b/src/emqttd_ws_client_sup.erl index 1375ac036..21f683eaa 100644 --- a/src/emqttd_ws_client_sup.erl +++ b/src/emqttd_ws_client_sup.erl @@ -29,7 +29,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% @doc Start a WebSocket Client +%% @doc Start a WebSocket Connection. -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). start_client(WsPid, Req, ReplyChannel) -> supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]). From fba79b3e25cca674aac783fa57428b23bb5b7135 Mon Sep 17 00:00:00 2001 From: huangpj Date: Wed, 8 Mar 2017 18:01:59 +0800 Subject: [PATCH 12/16] support pbkdf2 --- Makefile | 3 ++- src/emqttd_auth_mod.erl | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 913f11b89..558d66180 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROJECT_VERSION = 2.1.0 NO_AUTOPATCH = cuttlefish -DEPS = gproc lager esockd mochiweb lager_syslog +DEPS = gproc lager esockd mochiweb lager_syslog pbkdf2 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 @@ -12,6 +12,7 @@ dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master dep_mochiweb = git https://github.com/emqtt/mochiweb dep_lager_syslog = git https://github.com/basho/lager_syslog +dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index c27f49d36..4ddf4c067 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -22,7 +22,7 @@ -export([passwd_hash/2]). --type(hash_type() :: plain | md5 | sha | sha256). +-type(hash_type() :: plain | md5 | sha | sha256 | pbkdf2). %%-------------------------------------------------------------------- %% Authentication behavihour @@ -51,7 +51,7 @@ behaviour_info(_Other) -> -endif. %% @doc Password Hash --spec(passwd_hash(hash_type(), binary()) -> binary()). +-spec(passwd_hash(hash_type(), binary() | tuple()) -> binary()). passwd_hash(plain, Password) -> Password; passwd_hash(md5, Password) -> @@ -59,7 +59,11 @@ passwd_hash(md5, Password) -> passwd_hash(sha, Password) -> hexstring(crypto:hash(sha, Password)); passwd_hash(sha256, Password) -> - hexstring(crypto:hash(sha256, Password)). + hexstring(crypto:hash(sha256, Password)); +passwd_hash(pbkdf2,{Salt,Password,Macfun,Iterations,Dklen}) -> + {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen), + hexstring(Hexstring). + hexstring(<>) -> iolist_to_binary(io_lib:format("~32.16.0b", [X])); From 25c25826a7690c78d3cb6e6fc03b9dbaad84b957 Mon Sep 17 00:00:00 2001 From: huangpengju <3310324470@qq.com> Date: Thu, 9 Mar 2017 09:28:28 +0800 Subject: [PATCH 13/16] Update Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 558d66180..e137d5e3f 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master dep_mochiweb = git https://github.com/emqtt/mochiweb dep_lager_syslog = git https://github.com/basho/lager_syslog -dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 +dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 ERLC_OPTS += +'{parse_transform, lager_transform}' From 78b0f6691019139fc2599b3e4773d4a6aaf1aa22 Mon Sep 17 00:00:00 2001 From: huangpengju <3310324470@qq.com> Date: Thu, 9 Mar 2017 09:29:03 +0800 Subject: [PATCH 14/16] Update Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e137d5e3f..6506d1fbe 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master dep_mochiweb = git https://github.com/emqtt/mochiweb dep_lager_syslog = git https://github.com/basho/lager_syslog -dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 +dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 ERLC_OPTS += +'{parse_transform, lager_transform}' From b5ff80499ad138d5bc545d5f352335be3e907813 Mon Sep 17 00:00:00 2001 From: huangpengju <3310324470@qq.com> Date: Thu, 9 Mar 2017 09:29:31 +0800 Subject: [PATCH 15/16] Update emqttd_auth_mod.erl --- src/emqttd_auth_mod.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 4ddf4c067..8f0b7405a 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -62,8 +62,7 @@ passwd_hash(sha256, Password) -> hexstring(crypto:hash(sha256, Password)); passwd_hash(pbkdf2,{Salt,Password,Macfun,Iterations,Dklen}) -> {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen), - hexstring(Hexstring). - + pbkdf2:to_hex(Hexstring). hexstring(<>) -> iolist_to_binary(io_lib:format("~32.16.0b", [X])); From 61a71e7559f06f0cee6fbcc2db2f2a170e3b2497 Mon Sep 17 00:00:00 2001 From: huangpengju <3310324470@qq.com> Date: Sun, 12 Mar 2017 11:49:26 +0800 Subject: [PATCH 16/16] update emqttd_auth_mod.erl code format --- src/emqttd_auth_mod.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 8f0b7405a..ff7f79a20 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -60,7 +60,7 @@ passwd_hash(sha, Password) -> hexstring(crypto:hash(sha, Password)); passwd_hash(sha256, Password) -> hexstring(crypto:hash(sha256, Password)); -passwd_hash(pbkdf2,{Salt,Password,Macfun,Iterations,Dklen}) -> +passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) -> {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen), pbkdf2:to_hex(Hexstring).