Handle session expiry interval correctly
This commit is contained in:
parent
7052993d00
commit
cff120c6d0
|
@ -325,13 +325,35 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
handle_in(?PACKET(?PINGREQ), Channel) ->
|
handle_in(?PACKET(?PINGREQ), Channel) ->
|
||||||
{ok, ?PACKET(?PINGRESP), Channel};
|
{ok, ?PACKET(?PINGRESP), Channel};
|
||||||
|
|
||||||
handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel) ->
|
handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Session, protocol = Protocol}) ->
|
||||||
%% Clear will msg
|
OldInterval = emqx_session:info(expiry_interval, Session),
|
||||||
{stop, normal, Channel};
|
Interval = maps:get('Session-Expiry-Interval', case Properties of
|
||||||
|
undefined -> #{};
|
||||||
handle_in(?DISCONNECT_PACKET(RC), Channel = #channel{protocol = Protocol}) ->
|
_ -> Properties
|
||||||
Ver = emqx_protocol:info(proto_ver, Protocol),
|
end, OldInterval),
|
||||||
{stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, Channel};
|
case OldInterval =:= 0 andalso Interval =/= OldInterval of
|
||||||
|
true ->
|
||||||
|
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
||||||
|
false ->
|
||||||
|
NChannel = ensure_disconnected(case RC of
|
||||||
|
?RC_SUCCESS ->
|
||||||
|
Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol),
|
||||||
|
session = emqx_session:update_expiry_interval(Interval, Session)};
|
||||||
|
_ -> Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session)}
|
||||||
|
end),
|
||||||
|
case Interval of
|
||||||
|
?UINT_MAX -> {ok, NChannel};
|
||||||
|
Int when Int > 0 -> {ok, ensure_timer(expire_timer, NChannel)};
|
||||||
|
_Other ->
|
||||||
|
Reason = case RC of
|
||||||
|
?RC_SUCCESS -> closed;
|
||||||
|
_ ->
|
||||||
|
Ver = emqx_protocol:info(proto_ver, Protocol),
|
||||||
|
emqx_reason_codes:name(RC, Ver)
|
||||||
|
end,
|
||||||
|
{stop, {shutdown, Reason}, Channel}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
|
||||||
handle_in(?AUTH_PACKET(), Channel) ->
|
handle_in(?AUTH_PACKET(), Channel) ->
|
||||||
%%TODO: implement later.
|
%%TODO: implement later.
|
||||||
|
@ -905,7 +927,6 @@ open_session(#mqtt_packet_connect{clean_start = CleanStart,
|
||||||
#channel{client = Client = #{zone := Zone}, protocol = Protocol}) ->
|
#channel{client = Client = #{zone := Zone}, protocol = Protocol}) ->
|
||||||
MaxInflight = get_property('Receive-Maximum', ConnProps,
|
MaxInflight = get_property('Receive-Maximum', ConnProps,
|
||||||
emqx_zone:get_env(Zone, max_inflight, 65535)),
|
emqx_zone:get_env(Zone, max_inflight, 65535)),
|
||||||
|
|
||||||
Interval =
|
Interval =
|
||||||
case emqx_protocol:info(proto_ver, Protocol) of
|
case emqx_protocol:info(proto_ver, Protocol) of
|
||||||
?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0);
|
?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0);
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
|
|
||||||
-export([ find_alias/2
|
-export([ find_alias/2
|
||||||
, save_alias/3
|
, save_alias/3
|
||||||
|
, clear_will_msg/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([protocol/0]).
|
-export_type([protocol/0]).
|
||||||
|
@ -134,3 +135,5 @@ save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = undefined}) ->
|
||||||
save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) ->
|
save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) ->
|
||||||
Protocol#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}.
|
Protocol#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}.
|
||||||
|
|
||||||
|
clear_will_msg(Protocol) ->
|
||||||
|
Protocol#protocol{will_msg = undefined}.
|
||||||
|
|
|
@ -58,6 +58,8 @@
|
||||||
, stats/1
|
, stats/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([update_expiry_interval/2]).
|
||||||
|
|
||||||
-export([ subscribe/4
|
-export([ subscribe/4
|
||||||
, unsubscribe/3
|
, unsubscribe/3
|
||||||
]).
|
]).
|
||||||
|
@ -218,6 +220,9 @@ info(expiry_interval, #session{expiry_interval = Interval}) ->
|
||||||
info(created_at, #session{created_at = CreatedAt}) ->
|
info(created_at, #session{created_at = CreatedAt}) ->
|
||||||
CreatedAt.
|
CreatedAt.
|
||||||
|
|
||||||
|
update_expiry_interval(ExpiryInterval, Session) ->
|
||||||
|
Session#session{expiry_interval = ExpiryInterval}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Attrs of the session
|
%% Attrs of the session
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue