From c69a2b1b485ad555ea02f55c9e48bdba24b6c769 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 23 Aug 2019 09:35:29 +0800 Subject: [PATCH] Ensure session expiration (#2825) Ensure session expiration --- include/emqx_mqtt.hrl | 2 ++ src/emqx_channel.erl | 52 +++++++++++++++++++++++++++++-------- src/emqx_connection.erl | 12 ++++----- src/emqx_session.erl | 2 +- test/emqx_session_SUITE.erl | 24 +++-------------- 5 files changed, 53 insertions(+), 39 deletions(-) diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index cbffe27ee..3a138f393 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -17,6 +17,8 @@ -ifndef(EMQ_X_MQTT_HRL). -define(EMQ_X_MQTT_HRL, true). +-define(UINT_MAX, 16#FFFFFFFF). + %%-------------------------------------------------------------------- %% MQTT SockOpts %%-------------------------------------------------------------------- diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c1ae31e1d..84864fd07 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -74,6 +74,7 @@ %% Connected connected :: boolean(), connected_at :: erlang:timestamp(), + disconnected_at :: erlang:timestamp(), %% Takeover/Resume resuming :: boolean(), pendings :: list() @@ -169,7 +170,9 @@ info(oom_policy, #channel{oom_policy = Policy}) -> info(connected, #channel{connected = Connected}) -> Connected; info(connected_at, #channel{connected_at = ConnectedAt}) -> - ConnectedAt. + ConnectedAt; +info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) -> + DisconnectedAt. -spec(attrs(channel()) -> emqx_types:attrs()). attrs(#channel{client = Client, @@ -240,11 +243,12 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{pro ProtoVer = emqx_protocol:info(proto_ver, Protocol), ?LOG(warning, "Cannot publish message to ~s due to ~s", [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]), - case QoS of - ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel); - ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel); - ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel) - end + handle_out({disconnect, ReasonCode}, NChannel) + % case QoS of + % ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel); + % ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel); + % ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel) + % end end; %%TODO: How to handle the ReasonCode? @@ -589,6 +593,18 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; +handle_info(sock_closed, Channel = #channel{connected = false}) -> + shutdown(closed, Channel); +handle_info(sock_closed, Channel = #channel{session = Session}) -> + Interval = emqx_session:info(expiry_interval, Session), + case Interval of + ?UINT_MAX -> + {ok, ensure_disconnected(Channel)}; + Int when Int > 0 -> + {ok, ensure_timer(expire_timer, ensure_disconnected(Channel))}; + _Other -> shutdown(closed, Channel) + end; + handle_info(Info, Channel) -> ?LOG(error, "Unexpected info: ~p~n", [Info]), {ok, Channel}. @@ -643,7 +659,7 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session, end; timeout(_TRef, expire_session, Channel) -> - {ok, Channel}; + shutdown(expired, Channel); timeout(_TRef, Msg, Channel) -> ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), @@ -685,7 +701,7 @@ interval(retry_timer, #channel{session = Session}) -> interval(await_timer, #channel{session = Session}) -> emqx_session:info(await_rel_timeout, Session); interval(expire_timer, #channel{session = Session}) -> - emqx_session:info(expiry_interval, Session). + timer:seconds(emqx_session:info(expiry_interval, Session)). %%-------------------------------------------------------------------- %% Terminate @@ -886,11 +902,19 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId, open_session(#mqtt_packet_connect{clean_start = CleanStart, properties = ConnProps}, - #channel{client = Client = #{zone := Zone}}) -> + #channel{client = Client = #{zone := Zone}, protocol = Protocol}) -> MaxInflight = get_property('Receive-Maximum', ConnProps, emqx_zone:get_env(Zone, max_inflight, 65535)), - Interval = get_property('Session-Expiry-Interval', ConnProps, - emqx_zone:get_env(Zone, session_expiry_interval, 0)), + + Interval = + case emqx_protocol:info(proto_ver, Protocol) of + ?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0); + _ -> + case CleanStart of + true -> 0; + false -> emqx_zone:get_env(Zone, session_expiry_interval, 0) + end + end, emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight, expiry_interval => Interval }). @@ -1034,6 +1058,9 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId}, ensure_connected(Channel) -> Channel#channel{connected = true, connected_at = os:timestamp()}. +ensure_disconnected(Channel) -> + Channel#channel{connected = false, disconnected_at = os:timestamp()}. + ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel); ensure_keepalive(_AckProp, Channel = #channel{protocol = Protocol}) -> @@ -1111,3 +1138,6 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. +shutdown(Reason, Channel) -> + {stop, {shutdown, Reason}, Channel}. + diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index f6cd96108..b6cdf5f8b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -341,15 +341,13 @@ handle(info, {Error, _Sock, Reason}, State) when Error == tcp_error; Error == ssl_error -> shutdown(Reason, State); -%%TODO: fixme later. handle(info, {Closed, _Sock}, State = #state{chan_state = ChanState}) when Closed == tcp_closed; Closed == ssl_closed -> - case emqx_channel:info(protocol, ChanState) of - undefined -> shutdown(closed, State); - #{clean_start := true} -> - shutdown(closed, State); - #{clean_start := false} -> - {next_state, disconnected, State} + case emqx_channel:handle_info(sock_closed, ChanState) of + {ok, NChanState} -> + {next_state, disconnected, State#state{chan_state = NChanState}}; + {stop, Reason, NChanState} -> + stop(Reason, State#state{chan_state = NChanState}) end; handle(info, {Passive, _Sock}, State) when Passive == tcp_passive; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 98afade52..32a782ce0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -214,7 +214,7 @@ info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(expiry_interval, #session{expiry_interval = Interval}) -> - Interval div 1000; + Interval; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index c142284c1..9f537ff19 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -73,12 +73,12 @@ apply_ops(Session, [Op | Rest]) -> apply_op(Session, info) -> Info = emqx_session:info(Session), ?assert(is_map(Info)), - ?assertEqual(16, maps:size(Info)), + ?assertEqual(15, maps:size(Info)), Session; apply_op(Session, attrs) -> Attrs = emqx_session:attrs(Session), ?assert(is_map(Attrs)), - ?assertEqual(3, maps:size(Attrs)), + ?assertEqual(2, maps:size(Attrs)), Session; apply_op(Session, stats) -> Stats = emqx_session:stats(Session), @@ -145,14 +145,7 @@ apply_op(Session, {pubcomp, PacketId}) -> end; apply_op(Session, {deliver, Delivers}) -> {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session), - NSession; -apply_op(Session, {timeout, {TRef, TimeoutMsg}}) -> - case emqx_session:timeout(TRef, TimeoutMsg, Session) of - {ok, NSession} -> - NSession; - {ok, _Msg, NSession} -> - NSession - end. + NSession. %%%%%%%%%%%%%%%%%% %%% Generators %%% @@ -169,17 +162,13 @@ session_op_list() -> {pubrec, pubrec_args()}, {pubrel, pubrel_args()}, {pubcomp, pubcomp_args()}, - {deliver, deliver_args()}, - {timeout, timeout_args()} + {deliver, deliver_args()} ], list(?LAZY(oneof(Union))). deliver_args() -> list({deliver, topic(), message()}). -timeout_args() -> - {tref(), timeout_msg()}. - info_args() -> oneof([subscriptions, max_subscriptions, @@ -225,11 +214,6 @@ pubrel_args() -> pubcomp_args() -> packetid(). -timeout_msg() -> - oneof([retry_delivery, check_awaiting_rel]). - -tref() -> oneof([tref, undefined]). - sub_opts() -> ?LET({RH, RAP, NL, QOS, SHARE, SUBID}, {rh(), rap(), nl(), qos(), share(), subid()}