From 03f607c1b2c1dd97fcd2115c4f7c5ac3169ecf32 Mon Sep 17 00:00:00 2001 From: tigercl Date: Sat, 22 Sep 2018 16:10:24 +0800 Subject: [PATCH] Fix issue#1833 and #1834 (#1845) --- priv/emqx.schema | 2 +- src/emqx_protocol.erl | 41 ++++++++++++++++++++++++++++++--------- src/emqx_session.erl | 36 +++++++++++++++++----------------- test/emqx_mock_client.erl | 12 ++++++------ test/emqx_sm_SUITE.erl | 2 +- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 379a3939e..5f148ac96 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -797,7 +797,7 @@ end}. %% @doc Session Expiry Interval {mapping, "zone.$name.session_expiry_interval", "emqx.zones", [ {default, "2h"}, - {datatype, {duration, ms}} + {datatype, {duration, s}} ]}. %% @doc Type: simple | priority diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..4a2b57ea3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -410,9 +410,17 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> - %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined}}; +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), + PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> + case Interval =/= 0 andalso OldInterval =:= 0 of + true -> + deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), + {error, protocol_error, PState}; + false -> + emqx_session:update_expiry_interval(SPid, Interval), + %% Clean willmsg + {stop, normal, PState#pstate{will_msg = undefined}} + end; process_packet(?DISCONNECT_PACKET(_), PState) -> {stop, normal, PState}. @@ -562,17 +570,32 @@ maybe_assign_client_id(PState) -> PState. try_open_session(#pstate{zone = Zone, + proto_ver = ProtoVer, client_id = ClientId, conn_pid = ConnPid, conn_props = ConnProps, username = Username, clean_start = CleanStart}) -> - case emqx_sm:open_session(#{zone => Zone, - client_id => ClientId, - conn_pid => ConnPid, - username => Username, - clean_start => CleanStart, - conn_props => ConnProps}) of + + SessAttrs = #{ + zone => Zone, + client_id => ClientId, + conn_pid => ConnPid, + username => Username, + clean_start => CleanStart + }, + + case emqx_sm:open_session(maps:put(expiry_interval, if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Session-Expiry-Interval', ConnProps, 0); + true -> + case CleanStart of + true -> + 0; + false -> + emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end + end, SessAttrs)) of {ok, SPid} -> {ok, SPid, false}; Other -> Other diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 0a798f0c4..14ed2a21d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,6 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). +-export([update_expiry_interval/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -313,6 +314,10 @@ resume(SPid, ConnPid) -> discard(SPid, ByPid) -> gen_server:call(SPid, {discard, ByPid}, infinity). +-spec(update_expiry_interval(spid(), timeout()) -> ok). +update_expiry_interval(SPid, Interval) -> + gen_server:cast(SPid, {expiry_interval, Interval * 1000}). + -spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -321,12 +326,12 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([Parent, #{zone := Zone, - client_id := ClientId, - username := Username, - conn_pid := ConnPid, - clean_start := CleanStart, - conn_props := ConnProps}]) -> +init([Parent, #{zone := Zone, + client_id := ClientId, + username := Username, + conn_pid := ConnPid, + clean_start := CleanStart, + expiry_interval := ExpiryInterval}]) -> process_flag(trap_exit, true), true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), @@ -346,7 +351,7 @@ init([Parent, #{zone := Zone, awaiting_rel = #{}, await_rel_timeout = get_env(Zone, await_rel_timeout), max_awaiting_rel = get_env(Zone, max_awaiting_rel), - expiry_interval = expire_interval(Zone, ConnProps), + expiry_interval = ExpiryInterval, enable_stats = get_env(Zone, enable_stats, true), deliver_stats = 0, enqueue_stats = 0, @@ -361,11 +366,6 @@ init([Parent, #{zone := Zone, ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). -expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) -> - I * 1000; -expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1 - get_env(Zone, session_expiry_interval, 0). - init_mqueue(Zone) -> emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple), max_len => get_env(Zone, max_mqueue_len, 1000), @@ -540,6 +540,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1))); +handle_cast({expiry_interval, Interval}, State) -> + {noreply, State#state{expiry_interval = Interval}}; + handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -591,13 +594,10 @@ handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); -handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) -> - {stop, Reason, State#state{conn_pid = undefined}}; - handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> {stop, Reason, State#state{conn_pid = undefined}}; -handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) -> +handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> @@ -876,8 +876,8 @@ ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) -> ensure_retry_timer(_Timeout, State) -> State. -ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 -> - State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)}; +ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 andalso Interval =/= 16#ffffffff -> + State#state{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)}; ensure_expire_timer(State) -> State. diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 85114e3a9..633e42b3f 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -53,12 +53,12 @@ init([ClientId]) -> }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> - Attrs = #{ zone => Zone, - client_id => ClientId, - conn_pid => ClientPid, - clean_start => true, - username => undefined, - conn_props => undefined + Attrs = #{ zone => Zone, + client_id => ClientId, + conn_pid => ClientPid, + clean_start => true, + username => undefined, + expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, State#state{ diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 5e23100a9..6f9b92399 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -25,7 +25,7 @@ t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, conn_props => #{}}, + zone => internal, username => <<"zhou">>, expiry_interval => 0}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>),