diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 6eb375aba..1f878718c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -241,11 +241,31 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> emqx_cm_locker:trans(ClientId, ResumeStart). create_session(ClientInfo, ConnInfo) -> - Session = emqx_session:init(ClientInfo, ConnInfo), + Options = get_session_confs(ClientInfo, ConnInfo), + Session = emqx_session:init(Options), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. +get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> + #{max_subscriptions => get_conf(Zone, max_subscriptions), + upgrade_qos => get_conf(Zone, upgrade_qos), + max_inflight => MaxInflight, + retry_interval => get_conf(Zone, retry_interval), + await_rel_timeout => get_conf(Zone, await_rel_timeout), + mqueue => mqueue_confs(Zone) + }. + +mqueue_confs(Zone) -> + #{max_len => get_conf(Zone, max_mqueue_len), + store_qos0 => get_conf(Zone, mqueue_store_qos0), + priorities => get_conf(Zone, mqueue_priorities), + default_priority => get_conf(Zone, mqueue_default_priority) + }. + +get_conf(Zone, Key) -> + emqx_config:get_zone_conf(Zone, [mqtt, Key]). + %% @doc Try to takeover a session. -spec(takeover_session(emqx_types:clientid()) -> {error, term()} diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index c9a58edd8..c45f72401 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -55,7 +55,7 @@ -compile(nowarn_export_all). -endif. --export([init/2]). +-export([init/1]). -export([ info/1 , info/2 @@ -151,34 +151,41 @@ -define(DEFAULT_BATCH_N, 1000). +-type options() :: #{ max_subscriptions => non_neg_integer() + , upgrade_qos => boolean() + , retry_interval => timeout() + , max_awaiting_rel => non_neg_integer() | infinity + , await_rel_timeout => timeout() + , max_inflight => integer() + , mqueue => emqx_mqueue:options() + }. %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- --spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). -init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> - #session{max_subscriptions = get_conf(Zone, max_subscriptions), - subscriptions = #{}, - upgrade_qos = get_conf(Zone, upgrade_qos), - inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone), - next_pkt_id = 1, - retry_interval = timer:seconds(get_conf(Zone, retry_interval)), - awaiting_rel = #{}, - max_awaiting_rel = get_conf(Zone, max_awaiting_rel), - await_rel_timeout = timer:seconds(get_conf(Zone, await_rel_timeout)), - created_at = erlang:system_time(millisecond) - }. - -%% @private init mq -init_mqueue(Zone) -> - emqx_mqueue:init(#{ - max_len => get_conf(Zone, max_mqueue_len), - store_qos0 => get_conf(Zone, mqueue_store_qos0), - priorities => get_conf(Zone, mqueue_priorities), - default_priority => get_conf(Zone, mqueue_default_priority) - }). +-spec(init(options()) -> session()). +init(Opts) -> + MaxInflight = maps:get(max_inflight, Opts, 1), + QueueOpts = maps:merge( + #{max_len => 1000, + store_qos0 => false, + priorities => none, + default_priority => lowest + }, maps:get(mqueue, Opts, #{})), + #session{ + max_subscriptions = maps:get(max_subscriptions, Opts, 0), + subscriptions = #{}, + upgrade_qos = maps:get(upgrade_qos, Opts, false), + inflight = emqx_inflight:new(MaxInflight), + mqueue = emqx_mqueue:init(QueueOpts), + next_pkt_id = 1, + retry_interval = timer:seconds(maps:get(retry_interval, Opts, 0)), + awaiting_rel = #{}, + max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), + await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)), + created_at = erlang:system_time(millisecond) + }. %%-------------------------------------------------------------------- %% Info, Stats @@ -695,6 +702,3 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). - -get_conf(Zone, Key) -> - emqx_config:get_zone_conf(Zone, [mqtt, Key]).