fix: update max_awaiting_rel in session

This commit is contained in:
Zhongwen Deng 2023-03-02 14:55:02 +08:00
parent 43bedf1d4c
commit f498a3538b
7 changed files with 44 additions and 14 deletions

View File

@ -67,7 +67,8 @@
%% Test/debug interface
-export([
all_channels/0,
all_client_ids/0
all_client_ids/0,
get_session_confs/2
]).
%% gen_server callbacks
@ -355,6 +356,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{
max_inflight => MaxInflight,
retry_interval => get_mqtt_conf(Zone, retry_interval),
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel),
mqueue => mqueue_confs(Zone),
%% TODO: Add conf for allowing/disallowing persistent sessions.
%% Note that the connection info is already enriched to have

View File

@ -200,7 +200,7 @@
-spec init(options()) -> session().
init(Opts) ->
MaxInflight = maps:get(max_inflight, Opts, 1),
MaxInflight = maps:get(max_inflight, Opts),
QueueOpts = maps:merge(
#{
max_len => 1000,
@ -211,17 +211,17 @@ init(Opts) ->
#session{
id = emqx_guid:gen(),
clientid = maps:get(clientid, Opts, <<>>),
is_persistent = maps:get(is_persistent, Opts, false),
max_subscriptions = maps:get(max_subscriptions, Opts, infinity),
is_persistent = maps:get(is_persistent, Opts),
max_subscriptions = maps:get(max_subscriptions, Opts),
subscriptions = #{},
upgrade_qos = maps:get(upgrade_qos, Opts, false),
upgrade_qos = maps:get(upgrade_qos, Opts),
inflight = emqx_inflight:new(MaxInflight),
mqueue = emqx_mqueue:init(QueueOpts),
next_pkt_id = 1,
retry_interval = maps:get(retry_interval, Opts, 30000),
retry_interval = maps:get(retry_interval, Opts),
awaiting_rel = #{},
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
max_awaiting_rel = maps:get(max_awaiting_rel, Opts),
await_rel_timeout = maps:get(await_rel_timeout, Opts),
created_at = erlang:system_time(millisecond)
}.

View File

@ -1236,11 +1236,17 @@ connpkt(Props) ->
session() -> session(#{}).
session(InitFields) when is_map(InitFields) ->
Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 0, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
maps:fold(
fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session)
end,
emqx_session:init(#{max_inflight => 0}),
Session,
InitFields
).

View File

@ -673,7 +673,10 @@ channel(InitFields) ->
peercert => undefined,
mountpoint => undefined
},
Session = emqx_session:init(#{max_inflight => 0}),
Conf = emqx_cm:get_session_confs(ClientInfo, #{
receive_maximum => 0, expiry_interval => 1000
}),
Session = emqx_session:init(Conf),
maps:fold(
fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel)

View File

@ -63,7 +63,12 @@ end_per_testcase(_TestCase, Config) ->
%%--------------------------------------------------------------------
t_session_init(_) ->
Session = emqx_session:init(#{max_inflight => 64}),
Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 64, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
?assertEqual(infinity, emqx_session:info(subscriptions_max, Session)),
@ -459,11 +464,17 @@ mqueue(Opts) ->
session() -> session(#{}).
session(InitFields) when is_map(InitFields) ->
Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 0, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
maps:fold(
fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session)
end,
emqx_session:init(#{max_inflight => 0}),
Session,
InitFields
).

View File

@ -612,7 +612,10 @@ channel(InitFields) ->
peercert => undefined,
mountpoint => undefined
},
Session = emqx_session:init(#{max_inflight => 0}),
Conf = emqx_cm:get_session_confs(ClientInfo, #{
receive_maximum => 0, expiry_interval => 0
}),
Session = emqx_session:init(Conf),
maps:fold(
fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel)

View File

@ -389,7 +389,12 @@ process_connect(
clientinfo = ClientInfo
}
) ->
SessFun = fun(_, _) -> emqx_session:init(#{max_inflight => 1}) end,
SessFun = fun(ClientInfoT, _) ->
Conf = emqx_cm:get_session_confs(
ClientInfoT, #{receive_maximum => 1, expiry_interval => 0}
),
emqx_session:init(Conf)
end,
case
emqx_gateway_ctx:open_session(
Ctx,