Merge pull request #10055 from zhongwencool/fix-max-awaiting-rel

fix: update max_awaiting_rel in session
This commit is contained in:
zhongwencool 2023-03-07 18:37:20 +08:00 committed by GitHub
commit 0a5b221984
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 46 additions and 14 deletions

View File

@ -67,7 +67,8 @@
%% Test/debug interface %% Test/debug interface
-export([ -export([
all_channels/0, all_channels/0,
all_client_ids/0 all_client_ids/0,
get_session_confs/2
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -355,6 +356,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{
max_inflight => MaxInflight, max_inflight => MaxInflight,
retry_interval => get_mqtt_conf(Zone, retry_interval), retry_interval => get_mqtt_conf(Zone, retry_interval),
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout), await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel),
mqueue => mqueue_confs(Zone), mqueue => mqueue_confs(Zone),
%% TODO: Add conf for allowing/disallowing persistent sessions. %% TODO: Add conf for allowing/disallowing persistent sessions.
%% Note that the connection info is already enriched to have %% Note that the connection info is already enriched to have

View File

@ -200,7 +200,7 @@
-spec init(options()) -> session(). -spec init(options()) -> session().
init(Opts) -> init(Opts) ->
MaxInflight = maps:get(max_inflight, Opts, 1), MaxInflight = maps:get(max_inflight, Opts),
QueueOpts = maps:merge( QueueOpts = maps:merge(
#{ #{
max_len => 1000, max_len => 1000,
@ -211,17 +211,17 @@ init(Opts) ->
#session{ #session{
id = emqx_guid:gen(), id = emqx_guid:gen(),
clientid = maps:get(clientid, Opts, <<>>), clientid = maps:get(clientid, Opts, <<>>),
is_persistent = maps:get(is_persistent, Opts, false), is_persistent = maps:get(is_persistent, Opts),
max_subscriptions = maps:get(max_subscriptions, Opts, infinity), max_subscriptions = maps:get(max_subscriptions, Opts),
subscriptions = #{}, subscriptions = #{},
upgrade_qos = maps:get(upgrade_qos, Opts, false), upgrade_qos = maps:get(upgrade_qos, Opts),
inflight = emqx_inflight:new(MaxInflight), inflight = emqx_inflight:new(MaxInflight),
mqueue = emqx_mqueue:init(QueueOpts), mqueue = emqx_mqueue:init(QueueOpts),
next_pkt_id = 1, next_pkt_id = 1,
retry_interval = maps:get(retry_interval, Opts, 30000), retry_interval = maps:get(retry_interval, Opts),
awaiting_rel = #{}, awaiting_rel = #{},
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), max_awaiting_rel = maps:get(max_awaiting_rel, Opts),
await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), await_rel_timeout = maps:get(await_rel_timeout, Opts),
created_at = erlang:system_time(millisecond) created_at = erlang:system_time(millisecond)
}. }.

View File

@ -1236,11 +1236,17 @@ connpkt(Props) ->
session() -> session(#{}). session() -> session(#{}).
session(InitFields) when is_map(InitFields) -> session(InitFields) when is_map(InitFields) ->
Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 0, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
maps:fold( maps:fold(
fun(Field, Value, Session) -> fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session) emqx_session:set_field(Field, Value, Session)
end, end,
emqx_session:init(#{max_inflight => 0}), Session,
InitFields InitFields
). ).

View File

@ -673,7 +673,10 @@ channel(InitFields) ->
peercert => undefined, peercert => undefined,
mountpoint => undefined mountpoint => undefined
}, },
Session = emqx_session:init(#{max_inflight => 0}), Conf = emqx_cm:get_session_confs(ClientInfo, #{
receive_maximum => 0, expiry_interval => 1000
}),
Session = emqx_session:init(Conf),
maps:fold( maps:fold(
fun(Field, Value, Channel) -> fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel) emqx_channel:set_field(Field, Value, Channel)

View File

@ -63,7 +63,12 @@ end_per_testcase(_TestCase, Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_session_init(_) -> t_session_init(_) ->
Session = emqx_session:init(#{max_inflight => 64}), Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 64, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
?assertEqual(#{}, emqx_session:info(subscriptions, Session)), ?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)), ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
?assertEqual(infinity, emqx_session:info(subscriptions_max, Session)), ?assertEqual(infinity, emqx_session:info(subscriptions_max, Session)),
@ -459,11 +464,17 @@ mqueue(Opts) ->
session() -> session(#{}). session() -> session(#{}).
session(InitFields) when is_map(InitFields) -> session(InitFields) when is_map(InitFields) ->
Conf = emqx_cm:get_session_confs(
#{zone => default, clientid => <<"fake-test">>}, #{
receive_maximum => 0, expiry_interval => 0
}
),
Session = emqx_session:init(Conf),
maps:fold( maps:fold(
fun(Field, Value, Session) -> fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session) emqx_session:set_field(Field, Value, Session)
end, end,
emqx_session:init(#{max_inflight => 0}), Session,
InitFields InitFields
). ).

View File

@ -612,7 +612,10 @@ channel(InitFields) ->
peercert => undefined, peercert => undefined,
mountpoint => undefined mountpoint => undefined
}, },
Session = emqx_session:init(#{max_inflight => 0}), Conf = emqx_cm:get_session_confs(ClientInfo, #{
receive_maximum => 0, expiry_interval => 0
}),
Session = emqx_session:init(Conf),
maps:fold( maps:fold(
fun(Field, Value, Channel) -> fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel) emqx_channel:set_field(Field, Value, Channel)

View File

@ -389,7 +389,12 @@ process_connect(
clientinfo = ClientInfo clientinfo = ClientInfo
} }
) -> ) ->
SessFun = fun(_, _) -> emqx_session:init(#{max_inflight => 1}) end, SessFun = fun(ClientInfoT, _) ->
Conf = emqx_cm:get_session_confs(
ClientInfoT, #{receive_maximum => 1, expiry_interval => 0}
),
emqx_session:init(Conf)
end,
case case
emqx_gateway_ctx:open_session( emqx_gateway_ctx:open_session(
Ctx, Ctx,

View File

@ -0,0 +1 @@
Fix `mqtt.max_awaiting_rel` change does not work.

View File

@ -0,0 +1 @@
修复 `mqtt.max_awaiting_rel` 更新不生效问题。