diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 77bc44eeb..a26d35969 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -67,7 +67,8 @@ %% Test/debug interface -export([ all_channels/0, - all_client_ids/0 + all_client_ids/0, + get_session_confs/2 ]). %% gen_server callbacks @@ -355,6 +356,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{ max_inflight => MaxInflight, retry_interval => get_mqtt_conf(Zone, retry_interval), await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout), + max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel), mqueue => mqueue_confs(Zone), %% TODO: Add conf for allowing/disallowing persistent sessions. %% Note that the connection info is already enriched to have diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 2e17190e2..b3a8ecebc 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -200,7 +200,7 @@ -spec init(options()) -> session(). init(Opts) -> - MaxInflight = maps:get(max_inflight, Opts, 1), + MaxInflight = maps:get(max_inflight, Opts), QueueOpts = maps:merge( #{ max_len => 1000, @@ -211,17 +211,17 @@ init(Opts) -> #session{ id = emqx_guid:gen(), clientid = maps:get(clientid, Opts, <<>>), - is_persistent = maps:get(is_persistent, Opts, false), - max_subscriptions = maps:get(max_subscriptions, Opts, infinity), + is_persistent = maps:get(is_persistent, Opts), + max_subscriptions = maps:get(max_subscriptions, Opts), subscriptions = #{}, - upgrade_qos = maps:get(upgrade_qos, Opts, false), + upgrade_qos = maps:get(upgrade_qos, Opts), inflight = emqx_inflight:new(MaxInflight), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - retry_interval = maps:get(retry_interval, Opts, 30000), + retry_interval = maps:get(retry_interval, Opts), awaiting_rel = #{}, - max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), - await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), + max_awaiting_rel = maps:get(max_awaiting_rel, Opts), + await_rel_timeout = maps:get(await_rel_timeout, Opts), created_at = erlang:system_time(millisecond) }. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index c6610c0e2..c3f27269b 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -1236,11 +1236,17 @@ connpkt(Props) -> session() -> session(#{}). session(InitFields) when is_map(InitFields) -> + Conf = emqx_cm:get_session_confs( + #{zone => default, clientid => <<"fake-test">>}, #{ + receive_maximum => 0, expiry_interval => 0 + } + ), + Session = emqx_session:init(Conf), maps:fold( fun(Field, Value, Session) -> emqx_session:set_field(Field, Value, Session) end, - emqx_session:init(#{max_inflight => 0}), + Session, InitFields ). diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 23ddf4008..cc9e03168 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -673,7 +673,10 @@ channel(InitFields) -> peercert => undefined, mountpoint => undefined }, - Session = emqx_session:init(#{max_inflight => 0}), + Conf = emqx_cm:get_session_confs(ClientInfo, #{ + receive_maximum => 0, expiry_interval => 1000 + }), + Session = emqx_session:init(Conf), maps:fold( fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index ecc9794d1..95d94707c 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -63,7 +63,12 @@ end_per_testcase(_TestCase, Config) -> %%-------------------------------------------------------------------- t_session_init(_) -> - Session = emqx_session:init(#{max_inflight => 64}), + Conf = emqx_cm:get_session_confs( + #{zone => default, clientid => <<"fake-test">>}, #{ + receive_maximum => 64, expiry_interval => 0 + } + ), + Session = emqx_session:init(Conf), ?assertEqual(#{}, emqx_session:info(subscriptions, Session)), ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)), ?assertEqual(infinity, emqx_session:info(subscriptions_max, Session)), @@ -459,11 +464,17 @@ mqueue(Opts) -> session() -> session(#{}). session(InitFields) when is_map(InitFields) -> + Conf = emqx_cm:get_session_confs( + #{zone => default, clientid => <<"fake-test">>}, #{ + receive_maximum => 0, expiry_interval => 0 + } + ), + Session = emqx_session:init(Conf), maps:fold( fun(Field, Value, Session) -> emqx_session:set_field(Field, Value, Session) end, - emqx_session:init(#{max_inflight => 0}), + Session, InitFields ). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 787491c4b..de8b1c9af 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -612,7 +612,10 @@ channel(InitFields) -> peercert => undefined, mountpoint => undefined }, - Session = emqx_session:init(#{max_inflight => 0}), + Conf = emqx_cm:get_session_confs(ClientInfo, #{ + receive_maximum => 0, expiry_interval => 0 + }), + Session = emqx_session:init(Conf), maps:fold( fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 29dce90ee..23d07113c 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -389,7 +389,12 @@ process_connect( clientinfo = ClientInfo } ) -> - SessFun = fun(_, _) -> emqx_session:init(#{max_inflight => 1}) end, + SessFun = fun(ClientInfoT, _) -> + Conf = emqx_cm:get_session_confs( + ClientInfoT, #{receive_maximum => 1, expiry_interval => 0} + ), + emqx_session:init(Conf) + end, case emqx_gateway_ctx:open_session( Ctx,