From 52eae659832ba529f97290e1ce0572cea66b5939 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Mon, 8 Oct 2018 20:02:32 +0800 Subject: [PATCH 01/21] Fix topic_name validation bug Prior to this change, Prior to this change, the validation for the mqtt5.0 publish packet which both contains zero-length topic name and topic alias is wrong. This change fix this bug. --- src/emqx_packet.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 1d1e29c6a..7c51688dc 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -55,6 +55,8 @@ validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> validate_packet_id(PacketId) andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); +validate(?PUBLISH_PACKET(_QoS, <<>>, #{'Topic-Alias':= _I}, _)) -> + true; validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) -> error(topic_name_invalid); validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) -> From 9bcd4c3e0815a2756b3dbb1191418c0fd1c21453 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 9 Oct 2018 13:35:27 +0800 Subject: [PATCH 02/21] improve will message --- src/emqx_protocol.erl | 31 ++++++++++++++++++++++++------- src/emqx_session.erl | 2 +- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1fb80c0ce..0a5858e44 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -260,6 +260,7 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, properties = ConnProps, + will_props = WillProps, will_topic = WillTopic, client_id = ClientId, username = Username, @@ -267,7 +268,16 @@ process_packet(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - WillMsg = emqx_packet:will_msg(Connect), + Connect1 = if + ProtoVer =:= ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + SessionExpiryInterval = get_property('Session-Expiry-Interval', ConnProps, 0), + WillProps1 = set_property('Will-Delay-Interval', erlang:min(SessionExpiryInterval, WillDelayInterval), WillProps), + Connect#mqtt_packet_connect{will_props = WillProps1}; + true -> + Connect + end, + WillMsg = emqx_packet:will_msg(Connect1), PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -642,6 +652,11 @@ set_property(Name, Value, undefined) -> set_property(Name, Value, Props) -> Props#{Name => Value}. +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default). + %%------------------------------------------------------------------------------ %% Check Packet %%------------------------------------------------------------------------------ @@ -777,20 +792,22 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, client_id = ClientId, - will_msg = WillMsg}) -> + will_msg = WillMsg, + session = Session}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState), - _ = send_willmsg(WillMsg), + _ = send_willmsg(WillMsg, Session), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). -send_willmsg(undefined) -> +send_willmsg(undefined, _Session) -> ignore; send_willmsg(WillMsg = #message{topic = Topic, - headers = #{'Will-Delay-Interval' := Interval}}) + headers = #{'Will-Delay-Interval' := Interval}}, Session) when is_integer(Interval), Interval > 0 -> SendAfter = integer_to_binary(Interval), - emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); -send_willmsg(WillMsg) -> + Session1 = list_to_binary(pid_to_list(Session)), + emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>}); +send_willmsg(WillMsg, _Session) -> emqx_broker:publish(WillMsg). start_keepalive(0, _PState) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d327723f5..b63b45b18 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), - emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), + emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]), %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1))); From b80ba6e458b97d44baf53ce286a7bcad9632dfe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 9 Oct 2018 14:19:22 +0800 Subject: [PATCH 03/21] Fix bug when ConnProps is undefined --- src/emqx_protocol.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 0a5858e44..578417085 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -610,14 +610,14 @@ try_open_session(PState = #pstate{zone = Zone, set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Receive-Maximum', ConnProps, 65535); + get_property('Receive-Maximum', ConnProps, 65535); true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Session-Expiry-Interval', ConnProps, 0); + get_property('Session-Expiry-Interval', ConnProps, 0); true -> case CleanStart of true -> 0; @@ -628,7 +628,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Topic-Alias-Maximum', ConnProps, 0); + get_property('Topic-Alias-Maximum', ConnProps, 0); true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); From 29beb42aa2981f91b7482fda6a9f8755d48bf2a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 10 Oct 2018 14:00:17 +0800 Subject: [PATCH 04/21] Using client id rather then session pid --- src/emqx_protocol.erl | 14 ++++++-------- src/emqx_session.erl | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 578417085..bd440cf39 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -792,22 +792,20 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, client_id = ClientId, - will_msg = WillMsg, - session = Session}) -> + will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState), - _ = send_willmsg(WillMsg, Session), + _ = send_willmsg(WillMsg, ClientId), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). -send_willmsg(undefined, _Session) -> +send_willmsg(undefined, _ClientId) -> ignore; send_willmsg(WillMsg = #message{topic = Topic, - headers = #{'Will-Delay-Interval' := Interval}}, Session) + headers = #{'Will-Delay-Interval' := Interval} = Headers}, ClientId) when is_integer(Interval), Interval > 0 -> SendAfter = integer_to_binary(Interval), - Session1 = list_to_binary(pid_to_list(Session)), - emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>}); -send_willmsg(WillMsg, _Session) -> + emqx_broker:publish(WillMsg#message{topic = emqx_topic:join([<<"$will">>, SendAfter, Topic]), headers = Headers#{client_id => ClientId}}); +send_willmsg(WillMsg, _ClientId) -> emqx_broker:publish(WillMsg). start_keepalive(0, _PState) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b63b45b18..d327723f5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), - emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]), + emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1))); From 12da23066231a5712de606fa491476987b64adaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 10 Oct 2018 15:53:01 +0800 Subject: [PATCH 05/21] Increase coverage --- test/emqx_keepalive_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index c4dbd80f2..60472fd42 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -26,6 +26,7 @@ groups() -> [{keepalive, [], [t_keepalive]}]. %%-------------------------------------------------------------------- t_keepalive(_) -> + {ok, _} = emqx_keepalive:start(fun() -> {ok, 1} end, 0, {keepalive, timeout}), {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). From 6ffd0ac44f5fbf665efb869e2994549952bd1f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 10 Oct 2018 18:35:47 +0800 Subject: [PATCH 06/21] Increase coverage for emqx_protocol --- test/emqx_SUITE.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 166d64a30..ddff04490 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -39,6 +39,13 @@ clean_start = false, password = <<"public">>})). +-define(CLIENT3, ?CONNECT_PACKET(#mqtt_packet_connect{ + username = <<"admin">>, + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + password = <<"public">>, + will_props = #{'Will-Delay-Interval' => 2}})). + -define(SUBCODE, [0]). -define(PACKETID, 1). @@ -67,6 +74,7 @@ groups() -> [ mqtt_connect, mqtt_connect_with_tcp, + mqtt_connect_with_will_props, mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws @@ -110,6 +118,14 @@ mqtt_connect_with_tcp(_) -> {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), emqx_client_sock:close(Sock). +mqtt_connect_with_will_props(_) -> + %% Issue #599 + %% Empty clientId and clean_session = false + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = raw_send_serialise(?CLIENT3), + emqx_client_sock:send(Sock, Packet), + emqx_client_sock:close(Sock). + mqtt_connect_with_ssl_oneway(_) -> emqx:shutdown(), emqx_ct_broker_helpers:change_opts(ssl_oneway), From 3f761cbe6aa3979636db3c21e94e32f95e8368ad Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Tue, 23 Oct 2018 14:37:05 +0800 Subject: [PATCH 07/21] Support use certifate as username Prior to this change, you can just use CN or EN field from the client certificate as username. This change add a new option to allow user to use Certificate directly as username. --- etc/emqx.conf | 6 +++--- priv/emqx.schema | 6 +++--- src/emqx_protocol.erl | 7 ++++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 874a4c560..56fcf5ffc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1159,10 +1159,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: on | off ## listener.ssl.external.honor_cipher_order = on -## Use the CN field from the client certificate as a username. +## Use the CN, EN or CRT field from the client certificate as a username. ## Notice that 'verify' should be set as 'verify_peer'. ## -## Value: cn | en +## Value: cn | en | crt ## listener.ssl.external.peer_cert_as_username = cn ## TCP backlog for the SSL connection. @@ -1522,7 +1522,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## See: listener.ssl.$name.peer_cert_as_username ## -## Value: cn | dn +## Value: cn | dn | crt ## listener.wss.external.peer_cert_as_username = cn ## TCP backlog for the WebSocket/SSL connection. diff --git a/priv/emqx.schema b/priv/emqx.schema index becb4bff4..1424ab240 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -949,7 +949,7 @@ end}. ]}. {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ - {datatype, {enum, [cn, dn]}} + {datatype, {enum, [cn, dn, crt]}} ]}. {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ @@ -1139,7 +1139,7 @@ end}. ]}. {mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [ - {datatype, {enum, [cn, dn]}} + {datatype, {enum, [cn, dn, crt]}} ]}. %%-------------------------------------------------------------------- @@ -1400,7 +1400,7 @@ end}. ]}. {mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [ - {datatype, {enum, [cn, dn]}} + {datatype, {enum, [cn, dn, crt]}} ]}. {translation, "emqx.listeners", fun(Conf) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c3f0689fa..db239acef 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -106,9 +106,10 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of - cn -> esockd_peercert:common_name(Peercert); - dn -> esockd_peercert:subject(Peercert); - _ -> undefined + cn -> esockd_peercert:common_name(Peercert); + dn -> esockd_peercert:subject(Peercert); + crt -> Peercert; + _ -> undefined end. set_username(Username, PState = #pstate{username = undefined}) -> From 540484e6034ff3daeeb04132eab123d09eb33f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 23 Oct 2018 17:27:46 +0800 Subject: [PATCH 08/21] Send and cancel will message by apis rather than hook --- src/emqx_protocol.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7dd0b37e8..32ecd7137 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -638,6 +638,9 @@ try_open_session(PState = #pstate{zone = Zone, case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> {ok, SPid, false}; + {ok, SPid, true} -> + emqx_delayed_publish:cancel_publish(ClientId), + {ok, SPid, true}; Other -> Other end. @@ -838,11 +841,9 @@ shutdown(Reason, PState = #pstate{connected = true, send_willmsg(undefined, _ClientId) -> ignore; -send_willmsg(WillMsg = #message{topic = Topic, - headers = #{'Will-Delay-Interval' := Interval} = Headers}, ClientId) +send_willmsg(WillMsg = #message{headers = #{'Will-Delay-Interval' := Interval}}, ClientId) when is_integer(Interval), Interval > 0 -> - SendAfter = integer_to_binary(Interval), - emqx_broker:publish(WillMsg#message{topic = emqx_topic:join([<<"$will">>, SendAfter, Topic]), headers = Headers#{client_id => ClientId}}); + emqx_delayed_publish:delay_publish(WillMsg, ClientId); send_willmsg(WillMsg, _ClientId) -> emqx_broker:publish(WillMsg). From a1092a678473e5eccd16a52d5621a37ffff7aefe Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 24 Oct 2018 15:59:22 +0800 Subject: [PATCH 09/21] Add eunit cases and fix typo. --- src/emqx_protocol.erl | 62 +++++++++++++++++++----------------- test/emqx_protocol_tests.erl | 30 +++++++++++++++++ 2 files changed, 62 insertions(+), 30 deletions(-) create mode 100644 test/emqx_protocol_tests.erl diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index db239acef..fdec62d8b 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -33,35 +33,35 @@ -export([shutdown/2]). -record(pstate, { - zone, - sendfun, - peername, - peercert, - proto_ver, - proto_name, - ackprops, - client_id, - is_assigned, - conn_pid, - conn_props, - ack_props, - username, - session, - clean_start, - topic_aliases, - packet_size, - will_topic, - will_msg, - keepalive, - mountpoint, - is_super, - is_bridge, - enable_ban, - enable_acl, - recv_stats, - send_stats, - connected, - connected_at + zone, + sendfun, + peername, + peercert, + proto_ver, + proto_name, + ackprops, + client_id, + is_assigned, + conn_pid, + conn_props, + ack_props, + username, + session, + clean_start, + topic_aliases, + packet_size, + will_topic, + will_msg, + keepalive, + mountpoint, + is_super, + is_bridge, + enable_ban, + enable_acl, + recv_stats, + send_stats, + connected, + connected_at }). -type(state() :: #pstate{}). @@ -71,6 +71,8 @@ -compile(export_all). -endif. +-define(NO_PROPS, undefined). + -define(LOG(Level, Format, Args, PState), emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format, [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). @@ -672,7 +674,7 @@ authenticate(Credentials, Password) -> {error, Error} end. -set_property(Name, Value, undefined) -> +set_property(Name, Value, ?NO_PROPS) -> #{Name => Value}; set_property(Name, Value, Props) -> Props#{Name => Value}. diff --git a/test/emqx_protocol_tests.erl b/test/emqx_protocol_tests.erl new file mode 100644 index 000000000..56b65e36a --- /dev/null +++ b/test/emqx_protocol_tests.erl @@ -0,0 +1,30 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_protocol_tests). + +-include_lib("eunit/include/eunit.hrl"). + +set_property_test() -> + ?assertEqual(#{test => test_property}, emqx_protocol:set_property(test, test_property, undefined)), + TestMap = #{test => test_property}, + ?assertEqual(#{test => test_property, test1 => test_property2}, + emqx_protocol:set_property(test1, test_property2, TestMap)), + ok. + +init_username_test() -> + ?assertEqual(<<"Peercert">>, + emqx_protocol:init_username(<<"Peercert">>, [{peer_cert_as_username, crt}])), + ?assertEqual(undefined, + emqx_protocol:init_username(undefined, [{peer_cert_as_username, undefined}])). From 6675e3d496b9d6e4f195a9d59382a7a68737e7a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 25 Oct 2018 14:26:31 +0800 Subject: [PATCH 10/21] Implement will message delay publish in session, add test case for clean start and will message in connect packet --- src/emqx_protocol.erl | 27 ++------ src/emqx_session.erl | 83 ++++++++++++++++-------- src/emqx_sm.erl | 19 +++--- test/emqx_protocol_SUITE.erl | 119 +++++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 59 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 32ecd7137..2c516a534 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -50,7 +50,6 @@ clean_start, topic_aliases, packet_size, - will_topic, will_msg, keepalive, mountpoint, @@ -142,7 +141,6 @@ attrs(#pstate{zone = Zone, proto_ver = ProtoVer, proto_name = ProtoName, keepalive = Keepalive, - will_topic = WillTopic, mountpoint = Mountpoint, is_super = IsSuper, is_bridge = IsBridge, @@ -156,7 +154,6 @@ attrs(#pstate{zone = Zone, {proto_name, ProtoName}, {clean_start, CleanStart}, {keepalive, Keepalive}, - {will_topic, WillTopic}, {mountpoint, Mountpoint}, {is_super, IsSuper}, {is_bridge, IsBridge}, @@ -285,7 +282,6 @@ process_packet(?CONNECT_PACKET( keepalive = Keepalive, properties = ConnProps, will_props = WillProps, - will_topic = WillTopic, client_id = ClientId, username = Username, password = Password} = Connect), PState) -> @@ -310,7 +306,6 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, - will_topic = WillTopic, will_msg = WillMsg, is_bridge = IsBridge, connected_at = os:timestamp()}), @@ -535,7 +530,6 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, 'Response-Information' => ResponseInformation, - 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if @@ -624,23 +618,22 @@ try_open_session(PState = #pstate{zone = Zone, client_id = ClientId, conn_pid = ConnPid, username = Username, - clean_start = CleanStart}) -> + clean_start = CleanStart, + will_msg = WillMsg}) -> SessAttrs = #{ zone => Zone, client_id => ClientId, conn_pid => ConnPid, username => Username, - clean_start => CleanStart + clean_start => CleanStart, + will_msg => WillMsg }, SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> {ok, SPid, false}; - {ok, SPid, true} -> - emqx_delayed_publish:cancel_publish(ClientId), - {ok, SPid, true}; Other -> Other end. @@ -832,21 +825,11 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; Reason =:= discard -> emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, - client_id = ClientId, - will_msg = WillMsg}) -> + client_id = ClientId}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState), - _ = send_willmsg(WillMsg, ClientId), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). -send_willmsg(undefined, _ClientId) -> - ignore; -send_willmsg(WillMsg = #message{headers = #{'Will-Delay-Interval' := Interval}}, ClientId) - when is_integer(Interval), Interval > 0 -> - emqx_delayed_publish:delay_publish(WillMsg, ClientId); -send_willmsg(WillMsg, _ClientId) -> - emqx_broker:publish(WillMsg). - start_keepalive(0, _PState) -> ignore; start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d327723f5..15911ca7f 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -46,7 +46,7 @@ -export([start_link/1]). -export([info/1, attrs/1]). -export([stats/1]). --export([resume/2, discard/2]). +-export([resume/3, discard/2]). -export([update_expiry_interval/2, update_misc/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). @@ -147,7 +147,11 @@ %% Created at created_at :: erlang:timestamp(), - topic_alias_maximum :: pos_integer() + topic_alias_maximum :: pos_integer(), + + will_msg :: emqx:message(), + + will_delay_timer :: reference() | undefined }). -type(spid() :: pid()). @@ -307,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(spid(), pid()) -> ok). -resume(SPid, ConnPid) -> - gen_server:cast(SPid, {resume, ConnPid}). +-spec(resume(spid(), pid(), emqx:message()) -> ok). +resume(SPid, ConnPid, WillMsg) -> + gen_server:cast(SPid, {resume, ConnPid, WillMsg}). %% @doc Discard the session -spec(discard(spid(), ByPid :: pid()) -> ok). @@ -338,7 +342,8 @@ init([Parent, #{zone := Zone, clean_start := CleanStart, expiry_interval := ExpiryInterval, max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}]) -> + topic_alias_maximum := TopicAliasMaximum, + will_msg := WillMsg}]) -> process_flag(trap_exit, true), true = link(ConnPid), IdleTimout = get_env(Zone, idle_timeout, 30000), @@ -362,7 +367,8 @@ init([Parent, #{zone := Zone, deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp(), - topic_alias_maximum = TopicAliasMaximum + topic_alias_maximum = TopicAliasMaximum, + will_msg = WillMsg }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), @@ -511,17 +517,18 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, - conn_pid = OldConnPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer}) -> +handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer, + will_delay_timer = WillDelayTimer}) -> ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]), + lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State); @@ -530,14 +537,16 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, true = link(ConnPid), - State1 = State#state{conn_pid = ConnPid, - binding = binding(ConnPid), - old_conn_pid = OldConnPid, - clean_start = false, - retry_timer = undefined, - awaiting_rel = #{}, - await_rel_timer = undefined, - expiry_timer = undefined}, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, + clean_start = false, + retry_timer = undefined, + awaiting_rel = #{}, + await_rel_timer = undefined, + expiry_timer = undefined, + will_delay_timer = undefined, + will_msg = WillMsg}, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), @@ -610,13 +619,19 @@ handle_info({timeout, Timer, emit_stats}, end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), - shutdown(expired, State); + shutdown(expired, State#state{will_msg = undefined}); -handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> - {stop, Reason, State#state{conn_pid = undefined}}; +handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> + send_willmsg(WillMsg), + {noreply, State#state{will_msg = undefined}}; + +handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> + send_willmsg(WillMsg), + {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> - {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; + State1 = ensure_will_delay_timer(State), + {noreply, ensure_expire_timer(State1#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> %% ignore @@ -631,8 +646,9 @@ handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) -> +terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) -> emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), + send_willmsg(WillMsg), %% Ensure to shutdown the connection if ConnPid =/= undefined -> @@ -714,6 +730,14 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, ensure_retry_timer(Interval - max(0, Age), State) end. +%%------------------------------------------------------------------------------ +%% Send Will Message +%%------------------------------------------------------------------------------ +send_willmsg(undefined) -> + ignore; +send_willmsg(WillMsg) -> + emqx_broker:publish(WillMsg). + %%------------------------------------------------------------------------------ %% Expire Awaiting Rel %%------------------------------------------------------------------------------ @@ -899,6 +923,11 @@ ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > ensure_expire_timer(State) -> State. +ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) -> + State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)}; +ensure_will_delay_timer(State) -> + State. + ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index d45548a78..963cab1e4 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -59,13 +59,12 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, - client_id := ClientId, - conn_pid := ConnPid, - max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}) -> +open_session(SessAttrs = #{clean_start := false, + client_id := ClientId, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}) -> ResumeStart = fun(_) -> - case resume_session(ClientId, ConnPid) of + case resume_session(ClientId, SessAttrs) of {ok, SPid} -> emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}), {ok, SPid, true}; @@ -92,13 +91,13 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> %% @doc Try to resume a session. -spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}). resume_session(ClientId) -> - resume_session(ClientId, self()). + resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}). -resume_session(ClientId, ConnPid) -> +resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) -> case lookup_session(ClientId) of [] -> {error, not_found}; [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, ConnPid), + ok = emqx_session:resume(SPid, ConnPid, WillMsg), {ok, SPid}; Sessions -> [{_, SPid}|StaleSessions] = lists:reverse(Sessions), @@ -106,7 +105,7 @@ resume_session(ClientId, ConnPid) -> lists:foreach(fun({_, StalePid}) -> catch emqx_session:discard(StalePid, ConnPid) end, StaleSessions), - ok = emqx_session:resume(SPid, ConnPid), + ok = emqx_session:resume(SPid, ConnPid, WillMsg), {ok, SPid} end. diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index f97f475f8..060557c6c 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -131,6 +131,125 @@ connect_v5(_) -> #{'Response-Information' := _RespInfo}), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), + + % test clean start + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = true, + client_id = <<"myclient">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + emqx_client_sock:send(Sock, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_SUCCESS) + )) + end), + + timer:sleep(1000), + + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + client_id = <<"myclient">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + end), + + % test will message publish and cancel + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = true, + client_id = <<"myclient">>, + will_flag = true, + will_qos = ?QOS_1, + will_retain = false, + will_props = #{'Will-Delay-Interval' => 5}, + will_topic = <<"TopicA">>, + will_payload = <<"will message">>, + properties = #{'Session-Expiry-Interval' => 3} + } + ) + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + + {ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, + {active, false}], 3000), + + do_connect(Sock2, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, SubData} = gen_tcp:recv(Sock2, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ) + ), + + {error, timeout} = gen_tcp:recv(Sock2, 0, 1000), + + % session resumed + {ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, + {active, false}], 3000), + + emqx_client_sock:send(Sock3, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + client_id = <<"myclient">>, + will_flag = true, + will_qos = ?QOS_1, + will_retain = false, + will_props = #{'Will-Delay-Interval' => 5}, + will_topic = <<"TopicA">>, + will_payload = <<"will message 2">>, + properties = #{'Session-Expiry-Interval' => 3} + } + ) + ) + ), + {ok, Data3} = gen_tcp:recv(Sock3, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock3, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ) + ), + + {ok, WillData} = gen_tcp:recv(Sock2, 0), + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), + + emqx_client_sock:close(Sock2) + end), ok. do_connect(Sock, ProtoVer) -> From db2e47470a1dfd4275d598233221dc183866275f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 25 Oct 2018 16:19:57 +0800 Subject: [PATCH 11/21] Fix bugs --- src/emqx_protocol.erl | 22 +++++++++++----------- src/emqx_session.erl | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 2c516a534..d981309e4 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -281,23 +281,13 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, properties = ConnProps, - will_props = WillProps, client_id = ClientId, username = Username, password = Password} = Connect), PState) -> %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - Connect1 = if - ProtoVer =:= ?MQTT_PROTO_V5 -> - WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), - SessionExpiryInterval = get_property('Session-Expiry-Interval', ConnProps, 0), - WillProps1 = set_property('Will-Delay-Interval', erlang:min(SessionExpiryInterval, WillDelayInterval), WillProps), - Connect#mqtt_packet_connect{will_props = WillProps1}; - true -> - Connect - end, - WillMsg = emqx_packet:will_msg(Connect1), + WillMsg = make_will_msg(Connect), PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -687,6 +677,16 @@ get_property(_Name, undefined, Default) -> get_property(Name, Props, Default) -> maps:get(Name, Props, Default). +make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, + will_props = WillProps} = Connect) -> + emqx_packet:will_msg(if + ProtoVer =:= ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; + true -> + Connect + end). + %%------------------------------------------------------------------------------ %% Check Packet %%------------------------------------------------------------------------------ diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 15911ca7f..224b2be82 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -619,7 +619,7 @@ handle_info({timeout, Timer, emit_stats}, end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), - shutdown(expired, State#state{will_msg = undefined}); + shutdown(expired, State); handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> send_willmsg(WillMsg), From ffa220a87d4caf876bcc30c3adad51fbf37ed5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 26 Oct 2018 10:46:56 +0800 Subject: [PATCH 12/21] Fix bug in test case --- test/emqx_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 8d8819a8f..c6bea732a 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -121,7 +121,7 @@ mqtt_connect_with_will_props(_) -> %% Issue #599 %% Empty clientId and clean_session = false {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), - Packet = raw_send_serialise(?CLIENT3), + Packet = raw_send_serialize(?CLIENT3), emqx_client_sock:send(Sock, Packet), emqx_client_sock:close(Sock). From 7544a21e25b7e93ec8dc01dead69fbb67632a692 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 26 Oct 2018 14:04:33 +0800 Subject: [PATCH 13/21] Add test cases for emqx_bridge, emqx_mod_rewrite (#1914) --- Makefile | 2 +- src/emqx_mod_rewrite.erl | 1 - test/emqx_bridge_SUITE.erl | 57 +++++++++++++++++++++++++++++ test/emqx_mod_rewrite_tests.erl | 63 +++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 test/emqx_bridge_SUITE.erl create mode 100644 test/emqx_mod_rewrite_tests.erl diff --git a/Makefile b/Makefile index dbd503864..4fdaabe92 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 29dbb660c..25faef166 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -75,4 +75,3 @@ compile(Rules) -> {ok, MP} = re:compile(Re), {rewrite, Topic, MP, Dest} end, Rules). - diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl new file mode 100644 index 000000000..f337e3b4e --- /dev/null +++ b/test/emqx_bridge_SUITE.erl @@ -0,0 +1,57 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [bridge_test]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +bridge_test(_) -> + {ok, _Pid} = emqx_bridge:start_link(emqx, []), + #{msg := <<"start bridge successfully">>} + = emqx_bridge:start_bridge(emqx), + test_forwards(), + test_subscriptions(0), + test_subscriptions(1), + test_subscriptions(2), + #{msg := <<"stop bridge successfully">>} + = emqx_bridge:stop_bridge(emqx), + ok. + +test_forwards() -> + emqx_bridge:add_forward(emqx, <<"test_forwards">>), + [<<"test_forwards">>] = emqx_bridge:show_forwards(emqx), + emqx_bridge:del_forward(emqx, <<"test_forwards">>), + [] = emqx_bridge:show_forwards(emqx), + ok. + +test_subscriptions(QoS) -> + emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS), + [{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx), + emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>), + [] = emqx_bridge:show_subscriptions(emqx), + ok. diff --git a/test/emqx_mod_rewrite_tests.erl b/test/emqx_mod_rewrite_tests.erl new file mode 100644 index 000000000..6fea7ee71 --- /dev/null +++ b/test/emqx_mod_rewrite_tests.erl @@ -0,0 +1,63 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_mod_rewrite_tests). + +-include_lib("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +rules() -> + Rawrules1 = "x/# ^x/y/(.+)$ z/y/$1", + Rawrules2 = "y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2", + Rawrules = [Rawrules1, Rawrules2], + Rules = lists:map(fun(Rule) -> + [Topic, Re, Dest] = string:tokens(Rule, " "), + {rewrite, + list_to_binary(Topic), + list_to_binary(Re), + list_to_binary(Dest)} + end, Rawrules), + lists:map(fun({rewrite, Topic, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, Topic, MP, Dest} + end, Rules). + +rewrite_subscribe_test() -> + Rules = rules(), + io:format("Rules: ~p",[Rules]), + ?assertEqual({ok, [{<<"test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"z/y/test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)). + +rewrite_unsubscribe_test() -> + Rules = rules(), + ?assertEqual({ok, [{<<"test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"z/y/test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)). + +rewrite_publish_test() -> + Rules = rules(), + ?assertMatch({ok, #message{topic = <<"test">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"test">>}, Rules)), + ?assertMatch({ok, #message{topic = <<"z/y/test">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"x/y/test">>}, Rules)), + ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"y/test/z/test_topic">>}, Rules)). From e56252dac6a2448c85b8a5d200d2db171bea8800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 26 Oct 2018 14:34:22 +0800 Subject: [PATCH 14/21] Fix bugs in test cases --- test/emqx_mock_client.erl | 17 +++++++++-------- test/emqx_sm_SUITE.erl | 11 +++++++++-- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 4a49a1fc5..24de66d0a 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -46,14 +46,15 @@ init([ClientId]) -> }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> - Attrs = #{ zone => Zone, - client_id => ClientId, - conn_pid => ClientPid, - clean_start => true, - username => undefined, - expiry_interval => 0, - max_inflight => 0, - topic_alias_maximum => 0 + Attrs = #{ zone => Zone, + client_id => ClientId, + conn_pid => ClientPid, + clean_start => true, + username => undefined, + expiry_interval => 0, + max_inflight => 0, + topic_alias_maximum => 0, + will_msg => undefined }, {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 110e13026..2b83b6afb 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -24,8 +24,15 @@ all() -> [t_open_close_session]. t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0}, + Attrs = #{clean_start => true, + client_id => <<"client">>, + conn_pid => ClientPid, + zone => internal, + username => <<"zhou">>, + expiry_interval => 0, + max_inflight => 0, + topic_alias_maximum => 0, + will_msg => undefined}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>), From 0b44c1b75f4b4c2246d720947e2e0bc459f609a2 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 26 Oct 2018 17:25:31 +0800 Subject: [PATCH 15/21] improve_test_cases --- Makefile | 2 +- test/emqx_connection_SUITE.erl | 47 ---------------------------------- 2 files changed, 1 insertion(+), 48 deletions(-) delete mode 100644 test/emqx_connection_SUITE.erl diff --git a/Makefile b/Makefile index 4fdaabe92..90a46b9cd 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ EUNIT_OPTS = verbose # CT_SUITES = emqx_frame ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \ +CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl deleted file mode 100644 index 716e771b5..000000000 --- a/test/emqx_connection_SUITE.erl +++ /dev/null @@ -1,47 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_connection_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). - -all() -> - [{group, connection}]. - -groups() -> - [{connection, [sequence], [t_attrs]}]. - -init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), - Config. - -end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). - - -t_attrs(_) -> - {ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]), - [{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>), - Attrs = emqx_connection:attrs(ConnPid), - <<"simpleClient">> = proplists:get_value(client_id, Attrs), - <<"plain">> = proplists:get_value(username, Attrs), - emqx_client:disconnect(C). - -%% t_stats() -> -%% {ok, C, _ } = emqx_client; -%% t_stats() -> - From abb2e5c918be8eb2547cff5cc47532b5bf31c435 Mon Sep 17 00:00:00 2001 From: tigercl Date: Fri, 26 Oct 2018 17:27:02 +0800 Subject: [PATCH 16/21] Improve test cases, and fix some bugs (#1920) * Improve emqx_banned, emqx_pqueue, emqx_router test cases * Improve emqx_broker test case, and fix bug in emqx_broker * Add emqx_hooks to CT_SUITES --- Makefile | 2 +- src/emqx_banned.erl | 5 ++++ src/emqx_broker.erl | 6 ++-- test/emqx_banned_SUITE.erl | 16 +++++++---- test/emqx_broker_SUITE.erl | 13 +++++++-- test/emqx_pqueue_SUITE.erl | 56 +++++++++++++++++++++++++++++++++++++- test/emqx_router_SUITE.erl | 20 ++++++++++---- 7 files changed, 100 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 90a46b9cd..23a7da11c 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 8f1c3156f..175271306 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -102,8 +102,13 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +-ifdef(TEST). +ensure_expiry_timer(State) -> + State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}. +-else. ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. +-endif. expire_banned_items(Now) -> mnesia:foldl(fun diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index f0946e92d..0828d6be3 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -260,9 +260,11 @@ subscription(Topic, Subscriber) -> -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1; + {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1), + length(Match) >= 1; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1; + {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1), + length(Match) >= 1; subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index c91aeae45..9d4c85134 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -29,13 +29,17 @@ t_banned_all(_) -> emqx_ct_broker_helpers:run_setup_steps(), emqx_banned:start_link(), TimeNow = erlang:system_time(second), - ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>}, - reason = <<"test">>, - by = <<"banned suite">>, - desc = <<"test">>, - until = TimeNow + 10}), + Banned = #banned{who = {client_id, <<"TestClient">>}, + reason = <<"test">>, + by = <<"banned suite">>, + desc = <<"test">>, + until = TimeNow + 1}, + ok = emqx_banned:add(Banned), % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed - timer:sleep(100), + ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + timer:sleep(2500), + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + ok = emqx_banned:add(Banned), ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), emqx_banned:del({client_id, <<"TestClient">>}), ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 7baa248f3..7fcc2a598 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -60,6 +60,11 @@ subscribe_unsubscribe(_) -> ok = emqx:subscribe(<<"topic">>, <<"clientId">>), ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }), ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), + true = emqx:subscribed(<<"topic">>, <<"clientId">>), + Topics = emqx:topics(), + lists:foreach(fun(Topic) -> + ?assert(lists:member(Topic, Topics)) + end, Topics), ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). @@ -72,12 +77,16 @@ publish(_) -> ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). pubsub(_) -> + true = emqx:is_running(node()), Self = self(), Subscriber = {Self, <<"clientId">>}, ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }), - #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber), + true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}), + #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber), ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }), - #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber), diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index e610a7639..e7672cb0b 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -22,7 +22,7 @@ -define(PQ, emqx_pqueue). -all() -> [t_priority_queue_plen, t_priority_queue_out2]. +all() -> [t_priority_queue_plen, t_priority_queue_out2, t_priority_queues]. t_priority_queue_plen(_) -> Q = ?PQ:new(), @@ -67,3 +67,57 @@ t_priority_queue_out2(_) -> {Val5, Q6} = ?PQ:out(Q5), {value, a} = Val5, {empty, _Q7} = ?PQ:out(Q6). + +t_priority_queues(_) -> + Q0 = ?PQ:new(), + Q1 = ?PQ:new(), + PQueue = {pqueue, [{0, Q0}, {1, Q1}]}, + ?assert(?PQ:is_queue(PQueue)), + [] = ?PQ:to_list(PQueue), + + PQueue1 = ?PQ:in(a, 0, ?PQ:new()), + PQueue2 = ?PQ:in(b, 0, PQueue1), + + PQueue3 = ?PQ:in(c, 1, PQueue2), + PQueue4 = ?PQ:in(d, 1, PQueue3), + + 4 = ?PQ:len(PQueue4), + + [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4), + PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]), + + empty = ?PQ:highest(?PQ:new()), + 0 = ?PQ:highest(PQueue1), + 1 = ?PQ:highest(PQueue4), + + PQueue5 = ?PQ:in(e, infinity, PQueue4), + PQueue6 = ?PQ:in(f, 1, PQueue5), + + {{value, e}, PQueue7} = ?PQ:out(PQueue6), + {empty, _} = ?PQ:out(0, ?PQ:new()), + + {empty, Q0} = ?PQ:out_p(Q0), + + Q2 = ?PQ:in(a, Q0), + Q3 = ?PQ:in(b, Q2), + Q4 = ?PQ:in(c, Q3), + + {{value, a, 0}, _Q5} = ?PQ:out_p(Q4), + + {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7), + + Q4 = ?PQ:join(Q4, ?PQ:new()), + Q4 = ?PQ:join(?PQ:new(), Q4), + + {queue, [a], [a], 2} = ?PQ:join(Q2, Q2), + + {pqueue,[{-1,{queue,[f],[d],2}}, + {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2), + + {pqueue,[{-1,{queue,[f],[d],2}}, + {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8), + + {pqueue,[{-1,{queue,[f],[d,f,d],4}}, + {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8). + + diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index 196b1678e..a35da9c5d 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -29,7 +29,9 @@ all() -> groups() -> [{route, [sequence], [add_del_route, - match_routes]}]. + match_routes, + has_routes, + router_add_del]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -81,6 +83,7 @@ match_routes(_) -> has_routes(_) -> From = {self(), make_ref()}, ?R:add_route(From, <<"devices/+/messages">>, node()), + timer:sleep(200), ?assert(?R:has_routes(<<"devices/+/messages">>)). clear_tables() -> @@ -88,28 +91,33 @@ clear_tables() -> router_add_del(_) -> ?R:add_route(<<"#">>), - ?R:add_route(<<"a/b/c">>), + ?R:add_route(<<"a/b/c">>, node()), ?R:add_route(<<"+/#">>), Routes = [R1, R2 | _] = [ #route{topic = <<"#">>, dest = node()}, #route{topic = <<"+/#">>, dest = node()}, #route{topic = <<"a/b/c">>, dest = node()}], + timer:sleep(500), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), + ?R:print_routes(<<"a/b/c">>), + %% Batch Add lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), %% Del - ?R:del_route(<<"a/b/c">>), - [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), + ?R:del_route(<<"a/b/c">>, node()), + timer:sleep(500), + [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)), {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'}, ?R:add_route(R3), - ?R:del_route(R1), + ?R:del_route(<<"#">>), ?R:del_route(R2), ?R:del_route(R3), - [] = lists:sort(?R:match(<<"a/b/c">>)). + timer:sleep(500), + [] = lists:sort(?R:match_routes(<<"a/b/c">>)). From f7285d5a587915fa78037c03bf088d8704cd9060 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 26 Oct 2018 17:30:39 +0800 Subject: [PATCH 17/21] Delete ackprops in pstate Prior to this change, ackprops is duplicated with ack_props This change delete ackprops. --- src/emqx_protocol.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index fdec62d8b..4a784252b 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -39,7 +39,6 @@ peercert, proto_ver, proto_name, - ackprops, client_id, is_assigned, conn_pid, @@ -606,10 +605,10 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun %%------------------------------------------------------------------------------ %% Assign a clientid -maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> +maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1}; + PState#pstate{client_id = ClientId, is_assigned = true, ack_props = AckProps1}; maybe_assign_client_id(PState) -> PState. From 92251d4a8a497bb29fb48293cad2b6e26c3e34f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 26 Oct 2018 17:45:13 +0800 Subject: [PATCH 18/21] Make more normalize --- src/emqx_protocol.erl | 6 +++--- src/emqx_session.erl | 2 +- test/emqx_keepalive_SUITE.erl | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index d981309e4..e3ce36bd5 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -631,14 +631,14 @@ set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> get_property('Receive-Maximum', ConnProps, 65535); - true -> + true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0); - true -> + true -> case CleanStart of true -> 0; false -> @@ -649,7 +649,7 @@ set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVe maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> get_property('Topic-Alias-Maximum', ConnProps, 0); - true -> + true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); set_session_attrs({_, #pstate{}}, SessAttrs) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 224b2be82..e123516e9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -311,7 +311,7 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(spid(), pid(), emqx:message()) -> ok). +-spec(resume(spid(), pid(), emqx:message() | undefined) -> ok). resume(SPid, ConnPid, WillMsg) -> gen_server:cast(SPid, {resume, ConnPid, WillMsg}). diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index 60472fd42..c4dbd80f2 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -26,7 +26,6 @@ groups() -> [{keepalive, [], [t_keepalive]}]. %%-------------------------------------------------------------------- t_keepalive(_) -> - {ok, _} = emqx_keepalive:start(fun() -> {ok, 1} end, 0, {keepalive, timeout}), {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). From 881e1a962121bd8d63d9548b71a9b27cb008ff63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 26 Oct 2018 19:43:05 +0800 Subject: [PATCH 19/21] Add case for ets:match_object --- src/emqx_broker.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 0828d6be3..857090c25 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -260,11 +260,19 @@ subscription(Topic, Subscriber) -> -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1), - length(Match) >= 1; + case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of + {Match, _} -> + length(Match) >= 1; + '$end_of_table' -> + false + end; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1), - length(Match) >= 1; + case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of + {Match, _} -> + length(Match) >= 1; + '$end_of_table' -> + false + end; subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). From 7c14ba11d6cb0c4077e19c3220846200423b972d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Sat, 27 Oct 2018 13:59:17 +0800 Subject: [PATCH 20/21] Set some attributes when session resumed --- src/emqx_session.erl | 25 ++++++++++++++----------- src/emqx_sm.erl | 6 +++--- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e123516e9..4fb92cdce 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -46,7 +46,7 @@ -export([start_link/1]). -export([info/1, attrs/1]). -export([stats/1]). --export([resume/3, discard/2]). +-export([resume/2, discard/2]). -export([update_expiry_interval/2, update_misc/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). @@ -311,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(spid(), pid(), emqx:message() | undefined) -> ok). -resume(SPid, ConnPid, WillMsg) -> - gen_server:cast(SPid, {resume, ConnPid, WillMsg}). +-spec(resume(spid(), map()) -> ok). +resume(SPid, SessAttrs) -> + gen_server:cast(SPid, {resume, SessAttrs}). %% @doc Discard the session -spec(discard(spid(), ByPid :: pid()) -> ok). @@ -517,13 +517,15 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = ClientId, - conn_pid = OldConnPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer, - will_delay_timer = WillDelayTimer}) -> +handle_cast({resume, #{conn_pid := ConnPid, + will_msg := WillMsg, + expiry_interval := SessionExpiryInterval}}, State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer, + will_delay_timer = WillDelayTimer}) -> ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), @@ -545,6 +547,7 @@ handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = Client awaiting_rel = #{}, await_rel_timer = undefined, expiry_timer = undefined, + expiry_interval = SessionExpiryInterval, will_delay_timer = undefined, will_msg = WillMsg}, diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 963cab1e4..9eb853ac4 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -93,11 +93,11 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> resume_session(ClientId) -> resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}). -resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) -> +resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> case lookup_session(ClientId) of [] -> {error, not_found}; [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, ConnPid, WillMsg), + ok = emqx_session:resume(SPid, SessAttrs), {ok, SPid}; Sessions -> [{_, SPid}|StaleSessions] = lists:reverse(Sessions), @@ -105,7 +105,7 @@ resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) -> lists:foreach(fun({_, StalePid}) -> catch emqx_session:discard(StalePid, ConnPid) end, StaleSessions), - ok = emqx_session:resume(SPid, ConnPid, WillMsg), + ok = emqx_session:resume(SPid, SessAttrs), {ok, SPid} end. From 3cea06f88f1147d62cac28072224d8fc1479ca25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Sat, 27 Oct 2018 14:25:10 +0800 Subject: [PATCH 21/21] Remove resume_session/1, --- src/emqx_session.erl | 41 +++++++++++++++++++---------------------- src/emqx_sm.erl | 8 ++------ 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 4fb92cdce..286066a59 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,7 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). --export([update_expiry_interval/2, update_misc/2]). +-export([update_expiry_interval/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -324,9 +324,6 @@ discard(SPid, ByPid) -> update_expiry_interval(SPid, Interval) -> gen_server:cast(SPid, {expiry_interval, Interval}). -update_misc(SPid, Misc) -> - gen_server:cast(SPid, {update_misc, Misc}). - -spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -517,9 +514,11 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, #{conn_pid := ConnPid, - will_msg := WillMsg, - expiry_interval := SessionExpiryInterval}}, State = #state{client_id = ClientId, +handle_cast({resume, #{conn_pid := ConnPid, + will_msg := WillMsg, + expiry_interval := SessionExpiryInterval, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId, conn_pid = OldConnPid, clean_start = CleanStart, retry_timer = RetryTimer, @@ -539,17 +538,19 @@ handle_cast({resume, #{conn_pid := ConnPid, true = link(ConnPid), - State1 = State#state{conn_pid = ConnPid, - binding = binding(ConnPid), - old_conn_pid = OldConnPid, - clean_start = false, - retry_timer = undefined, - awaiting_rel = #{}, - await_rel_timer = undefined, - expiry_timer = undefined, - expiry_interval = SessionExpiryInterval, - will_delay_timer = undefined, - will_msg = WillMsg}, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, + clean_start = false, + retry_timer = undefined, + awaiting_rel = #{}, + await_rel_timer = undefined, + expiry_timer = undefined, + expiry_interval = SessionExpiryInterval, + inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + topic_alias_maximum = TopicAliasMaximum, + will_delay_timer = undefined, + will_msg = WillMsg}, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), @@ -562,10 +563,6 @@ handle_cast({resume, #{conn_pid := ConnPid, handle_cast({expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; -handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) -> - {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), - topic_alias_maximum = TopicAliasMaximum}}; - handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 9eb853ac4..bc3f6ff68 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -22,7 +22,7 @@ -export([open_session/1, close_session/1]). -export([lookup_session/1, lookup_session_pid/1]). --export([resume_session/1, resume_session/2]). +-export([resume_session/2]). -export([discard_session/1, discard_session/2]). -export([register_session/2, unregister_session/1]). -export([get_session_attrs/1, set_session_attrs/2]). @@ -66,7 +66,6 @@ open_session(SessAttrs = #{clean_start := false, ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of {ok, SPid} -> - emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}), {ok, SPid, true}; {error, not_found} -> emqx_session_sup:start_session(SessAttrs) @@ -89,10 +88,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> end, lookup_session(ClientId)). %% @doc Try to resume a session. --spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}). -resume_session(ClientId) -> - resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}). - +-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}). resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> case lookup_session(ClientId) of [] -> {error, not_found};