diff --git a/apps/emqx/include/emqx_session_mem.hrl b/apps/emqx/include/emqx_session_mem.hrl index ae155f766..88901a541 100644 --- a/apps/emqx/include/emqx_session_mem.hrl +++ b/apps/emqx/include/emqx_session_mem.hrl @@ -26,7 +26,7 @@ %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed - max_subscriptions = infinity :: non_neg_integer() | infinity, + max_subscriptions :: non_neg_integer() | infinity, %% Upgrade QoS? upgrade_qos = false :: boolean(), %% Client <- Broker: QoS1/2 messages sent to the client but @@ -40,14 +40,14 @@ %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval = 0 :: timeout(), + retry_interval :: timeout(), %% Client -> Broker: QoS2 messages received from the client, but %% have not been completely acknowledged awaiting_rel :: map(), %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel = infinity :: non_neg_integer() | infinity, + max_awaiting_rel :: non_neg_integer() | infinity, %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout = 0 :: timeout(), + await_rel_timeout :: timeout(), %% Created at created_at :: pos_integer() }). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 1d626fdb0..c8affdaea 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -158,7 +158,7 @@ create( Conf ) -> QueueOpts = get_mqueue_conf(Zone), - Session = #session{ + #session{ id = emqx_guid:gen(), clientid = ClientId, created_at = erlang:system_time(millisecond), @@ -167,9 +167,13 @@ create( inflight = emqx_inflight:new(ReceiveMax), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - awaiting_rel = #{} - }, - preserve_conf(Conf, Session). + awaiting_rel = #{}, + max_subscriptions = maps:get(max_subscriptions, Conf), + max_awaiting_rel = maps:get(max_awaiting_rel, Conf), + upgrade_qos = maps:get(upgrade_qos, Conf), + retry_interval = maps:get(retry_interval, Conf), + await_rel_timeout = maps:get(await_rel_timeout, Conf) + }. get_mqueue_conf(Zone) -> #{ @@ -204,7 +208,7 @@ open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> case emqx_cm:takeover_session_end(TakeoverState) of {ok, Pendings} -> Session1 = resize_inflight(ConnInfo, Session0), - Session = preserve_conf(Conf, Session1), + Session = apply_conf(Conf, Session1), clean_session(ClientInfo, Session, Pendings); {error, _} -> % TODO log error? @@ -219,7 +223,7 @@ resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = inflight = emqx_inflight:resize(ReceiveMax, Inflight) }. -preserve_conf(Conf, Session = #session{}) -> +apply_conf(Conf, Session = #session{}) -> Session#session{ max_subscriptions = maps:get(max_subscriptions, Conf), max_awaiting_rel = maps:get(max_awaiting_rel, Conf),