refactor(emqx): strip the side effects of emqx_session module

This commit is contained in:
JianBo He 2021-07-20 19:01:53 +08:00
parent 0f6738f2a8
commit b782d4c53a
2 changed files with 52 additions and 28 deletions

View File

@ -241,11 +241,31 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
emqx_cm_locker:trans(ClientId, ResumeStart). emqx_cm_locker:trans(ClientId, ResumeStart).
create_session(ClientInfo, ConnInfo) -> create_session(ClientInfo, ConnInfo) ->
Session = emqx_session:init(ClientInfo, ConnInfo), Options = get_session_confs(ClientInfo, ConnInfo),
Session = emqx_session:init(Options),
ok = emqx_metrics:inc('session.created'), ok = emqx_metrics:inc('session.created'),
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
Session. Session.
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
#{max_subscriptions => get_conf(Zone, max_subscriptions),
upgrade_qos => get_conf(Zone, upgrade_qos),
max_inflight => MaxInflight,
retry_interval => get_conf(Zone, retry_interval),
await_rel_timeout => get_conf(Zone, await_rel_timeout),
mqueue => mqueue_confs(Zone)
}.
mqueue_confs(Zone) ->
#{max_len => get_conf(Zone, max_mqueue_len),
store_qos0 => get_conf(Zone, mqueue_store_qos0),
priorities => get_conf(Zone, mqueue_priorities),
default_priority => get_conf(Zone, mqueue_default_priority)
}.
get_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
%% @doc Try to takeover a session. %% @doc Try to takeover a session.
-spec(takeover_session(emqx_types:clientid()) -spec(takeover_session(emqx_types:clientid())
-> {error, term()} -> {error, term()}

View File

@ -55,7 +55,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-export([init/2]). -export([init/1]).
-export([ info/1 -export([ info/1
, info/2 , info/2
@ -151,34 +151,41 @@
-define(DEFAULT_BATCH_N, 1000). -define(DEFAULT_BATCH_N, 1000).
-type options() :: #{ max_subscriptions => non_neg_integer()
, upgrade_qos => boolean()
, retry_interval => timeout()
, max_awaiting_rel => non_neg_integer() | infinity
, await_rel_timeout => timeout()
, max_inflight => integer()
, mqueue => emqx_mqueue:options()
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Init a Session %% Init a Session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). -spec(init(options()) -> session()).
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> init(Opts) ->
#session{max_subscriptions = get_conf(Zone, max_subscriptions), MaxInflight = maps:get(max_inflight, Opts, 1),
subscriptions = #{}, QueueOpts = maps:merge(
upgrade_qos = get_conf(Zone, upgrade_qos), #{max_len => 1000,
inflight = emqx_inflight:new(MaxInflight), store_qos0 => false,
mqueue = init_mqueue(Zone), priorities => none,
next_pkt_id = 1, default_priority => lowest
retry_interval = timer:seconds(get_conf(Zone, retry_interval)), }, maps:get(mqueue, Opts, #{})),
awaiting_rel = #{}, #session{
max_awaiting_rel = get_conf(Zone, max_awaiting_rel), max_subscriptions = maps:get(max_subscriptions, Opts, 0),
await_rel_timeout = timer:seconds(get_conf(Zone, await_rel_timeout)), subscriptions = #{},
created_at = erlang:system_time(millisecond) upgrade_qos = maps:get(upgrade_qos, Opts, false),
}. inflight = emqx_inflight:new(MaxInflight),
mqueue = emqx_mqueue:init(QueueOpts),
%% @private init mq next_pkt_id = 1,
init_mqueue(Zone) -> retry_interval = timer:seconds(maps:get(retry_interval, Opts, 0)),
emqx_mqueue:init(#{ awaiting_rel = #{},
max_len => get_conf(Zone, max_mqueue_len), max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
store_qos0 => get_conf(Zone, mqueue_store_qos0), await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)),
priorities => get_conf(Zone, mqueue_priorities), created_at = erlang:system_time(millisecond)
default_priority => get_conf(Zone, mqueue_default_priority) }.
}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Stats %% Info, Stats
@ -695,6 +702,3 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) -> set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)), Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos+1, Session, Value). setelement(Pos+1, Session, Value).
get_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]).