diff --git a/CHANGELOG.md b/CHANGELOG.md index d95eb02eb..bf291b55b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ emqttd ChangeLog ================== +0.12.1-beta (2015-10-15) +------------------------- + +Highlight: Release for Bugfix and Code Refactor. + +Feature: Retained message expiration (#182) + +Improve: '$SYS/#' publish will not match '#' or '+/#' (#68) + +Improve: Add more metrics and ignore '$SYS/#' publish (#266) + +Improve: emqttd_sm should be optimized for clustered nodes may be crashed (#282) + +Improve: Refactor emqttd_sysmon and suppress 'monitor' messages (#328) + +Task: benchmark for 0.12.0 release (#225) + +Benchmark: About 900K concurrent connections established on a 20Core, 32G CentOS server. + + 0.12.0-beta (2015-10-08) ------------------------- diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index c5adb66b3..b7f2c5743 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -209,20 +209,20 @@ #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{return_code = ReturnCode}}). +-define(PUBLISH_PACKET(Qos, PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = Qos}, + variable = #mqtt_packet_publish{packet_id = PacketId}}). + -define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = Qos}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = Qos}, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, - payload = Payload}). - --define(PUBLISH(Qos, PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = Qos}, - variable = #mqtt_packet_publish{packet_id = PacketId}}). + payload = Payload}). -define(PUBACK_PACKET(Type, PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = Type}, + #mqtt_packet{header = #mqtt_packet_header{type = Type}, variable = #mqtt_packet_puback{packet_id = PacketId}}). -define(PUBREL_PACKET(PacketId), diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index cebe8b562..b72f68a69 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -113,33 +113,40 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -received(Packet = ?PACKET(Type)) -> +%%------------------------------------------------------------------------------ +%% @doc Count packets received. +%% @end +%%------------------------------------------------------------------------------ +-spec received(mqtt_packet()) -> ok. +received(Packet) -> inc('packets/received'), - received(Type, Packet). -received(?CONNECT, _Packet) -> - inc('packets/connect'); -received(?PUBLISH, ?PUBLISH(Qos, _PktId)) -> + received1(Packet). +received1(?PUBLISH_PACKET(Qos, _PktId)) -> inc('packets/publish/received'), inc('messages/received'), qos_received(Qos); -received(?PUBACK, _Packet) -> +received1(?PACKET(Type)) -> + received2(Type). +received2(?CONNECT) -> + inc('packets/connect'); +received2(?PUBACK) -> inc('packets/puback/received'); -received(?PUBREC, _Packet) -> +received2(?PUBREC) -> inc('packets/pubrec/received'); -received(?PUBREL, _Packet) -> +received2(?PUBREL) -> inc('packets/pubrel/received'); -received(?PUBCOMP, _Packet) -> +received2(?PUBCOMP) -> inc('packets/pubcomp/received'); -received(?SUBSCRIBE, _Packet) -> +received2(?SUBSCRIBE) -> inc('packets/subscribe'); -received(?UNSUBSCRIBE, _Packet) -> +received2(?UNSUBSCRIBE) -> inc('packets/unsubscribe'); -received(?PINGREQ, _Packet) -> +received2(?PINGREQ) -> inc('packets/pingreq'); -received(?DISCONNECT, _Packet) -> +received2(?DISCONNECT) -> inc('packets/disconnect'); -received(_, _) -> ignore. - +received2(_) -> + ignore. qos_received(?QOS_0) -> inc('messages/qos0/received'); qos_received(?QOS_1) -> @@ -147,32 +154,40 @@ qos_received(?QOS_1) -> qos_received(?QOS_2) -> inc('messages/qos2/received'). -sent(Packet = ?PACKET(Type)) -> +%%------------------------------------------------------------------------------ +%% @doc Count packets received. Will not count $SYS PUBLISH. +%% @end +%%------------------------------------------------------------------------------ +-spec sent(mqtt_packet()) -> ok. +sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) -> + ignore; +sent(Packet) -> emqttd_metrics:inc('packets/sent'), - sent(Type, Packet). -sent(?CONNACK, _Packet) -> - inc('packets/connack'); -sent(?PUBLISH, ?PUBLISH(Qos, _PktId)) -> + sent1(Packet). +sent1(?PUBLISH_PACKET(Qos, _PktId)) -> inc('packets/publish/sent'), inc('messages/sent'), qos_sent(Qos); -sent(?PUBACK, _Packet) -> +sent1(?PACKET(Type)) -> + sent2(Type). +sent2(?CONNACK) -> + inc('packets/connack'); +sent2(?PUBACK) -> inc('packets/puback/sent'); -sent(?PUBREC, _Packet) -> +sent2(?PUBREC) -> inc('packets/pubrec/sent'); -sent(?PUBREL, _Packet) -> +sent2(?PUBREL) -> inc('packets/pubrel/sent'); -sent(?PUBCOMP, _Packet) -> +sent2(?PUBCOMP) -> inc('packets/pubcomp/sent'); -sent(?SUBACK, _Packet) -> +sent2(?SUBACK) -> inc('packets/suback'); -sent(?UNSUBACK, _Packet) -> +sent2(?UNSUBACK) -> inc('packets/unsuback'); -sent(?PINGRESP, _Packet) -> +sent2(?PINGRESP) -> inc('packets/pingresp'); -sent(_Type, _Packet) -> +sent2(_Type) -> ingore. - qos_sent(?QOS_0) -> inc('messages/qos0/sent'); qos_sent(?QOS_1) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 9eab5fe14..ce85ddab3 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -261,12 +261,12 @@ process(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -publish(Packet = ?PUBLISH(?QOS_0, _PacketId), +publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) -> Msg = emqttd_message:from_packet(ClientId, Packet), emqttd_session:publish(Session, Msg); -publish(Packet = ?PUBLISH(?QOS_1, PacketId), +publish(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> Msg = emqttd_message:from_packet(ClientId, Packet), case emqttd_session:publish(Session, Msg) of @@ -276,7 +276,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error]) end; -publish(Packet = ?PUBLISH(?QOS_2, PacketId), +publish(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> Msg = emqttd_message:from_packet(ClientId, Packet), case emqttd_session:publish(Session, Msg) of