diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 84864fd07..137770c73 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -325,13 +325,35 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel) -> - %% Clear will msg - {stop, normal, Channel}; - -handle_in(?DISCONNECT_PACKET(RC), Channel = #channel{protocol = Protocol}) -> - Ver = emqx_protocol:info(proto_ver, Protocol), - {stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, Channel}; +handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Session, protocol = Protocol}) -> + OldInterval = emqx_session:info(expiry_interval, Session), + Interval = maps:get('Session-Expiry-Interval', case Properties of + undefined -> #{}; + _ -> Properties + end, OldInterval), + case OldInterval =:= 0 andalso Interval =/= OldInterval of + true -> + handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); + false -> + NChannel = ensure_disconnected(case RC of + ?RC_SUCCESS -> + Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol), + session = emqx_session:update_expiry_interval(Interval, Session)}; + _ -> Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session)} + end), + case Interval of + ?UINT_MAX -> {ok, NChannel}; + Int when Int > 0 -> {ok, ensure_timer(expire_timer, NChannel)}; + _Other -> + Reason = case RC of + ?RC_SUCCESS -> closed; + _ -> + Ver = emqx_protocol:info(proto_ver, Protocol), + emqx_reason_codes:name(RC, Ver) + end, + {stop, {shutdown, Reason}, Channel} + end + end; handle_in(?AUTH_PACKET(), Channel) -> %%TODO: implement later. @@ -905,7 +927,6 @@ open_session(#mqtt_packet_connect{clean_start = CleanStart, #channel{client = Client = #{zone := Zone}, protocol = Protocol}) -> MaxInflight = get_property('Receive-Maximum', ConnProps, emqx_zone:get_env(Zone, max_inflight, 65535)), - Interval = case emqx_protocol:info(proto_ver, Protocol) of ?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0); diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ebd59106d..8bb5df7bf 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -28,6 +28,7 @@ -export([ find_alias/2 , save_alias/3 + , clear_will_msg/1 ]). -export_type([protocol/0]). @@ -134,3 +135,5 @@ save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = undefined}) -> save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) -> Protocol#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}. +clear_will_msg(Protocol) -> + Protocol#protocol{will_msg = undefined}. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 32a782ce0..6f2df1b8c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -58,6 +58,8 @@ , stats/1 ]). +-export([update_expiry_interval/2]). + -export([ subscribe/4 , unsubscribe/3 ]). @@ -218,6 +220,9 @@ info(expiry_interval, #session{expiry_interval = Interval}) -> info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. +update_expiry_interval(ExpiryInterval, Session) -> + Session#session{expiry_interval = ExpiryInterval}. + %%-------------------------------------------------------------------- %% Attrs of the session %%--------------------------------------------------------------------