From 88103c5f0ecd6dd2c1063e86f978e9cb9bf8d5cb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 28 Nov 2023 19:13:14 +0300 Subject: [PATCH] refactor(session): pass config to `SessionImpl:open/3` as well * Anticipate that connection info may change during session takeover. * Avoid persisting session conf as part of persistent session state. --- apps/emqx/include/emqx_session_mem.hrl | 10 ++--- apps/emqx/src/emqx_persistent_session_ds.erl | 34 +++++++------- apps/emqx/src/emqx_session.erl | 28 +++++------- apps/emqx/src/emqx_session_mem.erl | 44 +++++++++++++------ apps/emqx/test/emqx_session_mem_SUITE.erl | 4 +- .../src/emqx_mqttsn_session.erl | 2 +- 6 files changed, 65 insertions(+), 57 deletions(-) diff --git a/apps/emqx/include/emqx_session_mem.hrl b/apps/emqx/include/emqx_session_mem.hrl index 9874a9018..ae155f766 100644 --- a/apps/emqx/include/emqx_session_mem.hrl +++ b/apps/emqx/include/emqx_session_mem.hrl @@ -26,9 +26,9 @@ %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed - max_subscriptions :: non_neg_integer() | infinity, + max_subscriptions = infinity :: non_neg_integer() | infinity, %% Upgrade QoS? - upgrade_qos :: boolean(), + upgrade_qos = false :: boolean(), %% Client <- Broker: QoS1/2 messages sent to the client but %% have not been unacked. inflight :: emqx_inflight:inflight(), @@ -40,14 +40,14 @@ %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval :: timeout(), + retry_interval = 0 :: timeout(), %% Client -> Broker: QoS2 messages received from the client, but %% have not been completely acknowledged awaiting_rel :: map(), %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel :: non_neg_integer() | infinity, + max_awaiting_rel = infinity :: non_neg_integer() | infinity, %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout :: timeout(), + await_rel_timeout = 0 :: timeout(), %% Created at created_at :: pos_integer() }). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b50ac8c64..5bf0a82a5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -29,7 +29,7 @@ %% Session API -export([ create/3, - open/2, + open/3, destroy/1 ]). @@ -119,6 +119,8 @@ conninfo := emqx_types:conninfo(), %% Timers timer() => reference(), + %% Upgrade QoS? + upgrade_qos := boolean(), %% props := map() }. @@ -151,11 +153,12 @@ session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration - ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). + Session = ensure_timers(session_ensure_new(ClientID, ConnInfo)), + preserve_conf(ConnInfo, Conf, Session). --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> +open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we @@ -165,20 +168,16 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - ReceiveMaximum = receive_maximum(ConnInfo), - Session = Session0#{receive_maximum => ReceiveMaximum}, + Session = preserve_conf(ConnInfo, Conf, Session0), {true, ensure_timers(Session), []}; false -> false end. -ensure_session(ClientID, ConnInfo, Conf) -> - Session = session_ensure_new(ClientID, ConnInfo, Conf), - ReceiveMaximum = receive_maximum(ConnInfo), +preserve_conf(ConnInfo, Conf, Session) -> Session#{ - conninfo => ConnInfo, - receive_maximum => ReceiveMaximum, - subscriptions => #{} + receive_maximum => receive_maximum(ConnInfo), + upgrade_qos => maps:get(upgrade_qos, Conf) }. -spec destroy(session() | clientinfo()) -> ok. @@ -644,25 +643,24 @@ session_open(SessionId, NewConnInfo) -> end end). --spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) -> +-spec session_ensure_new(id(), emqx_types:conninfo()) -> session(). -session_ensure_new(SessionId, ConnInfo, Props) -> +session_ensure_new(SessionId, ConnInfo) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), - Session = export_session(session_create(SessionId, ConnInfo, Props)), + Session = export_session(session_create(SessionId, ConnInfo)), Session#{ subscriptions => #{}, inflight => emqx_persistent_message_ds_replayer:new() } end). -session_create(SessionId, ConnInfo, Props) -> +session_create(SessionId, ConnInfo) -> Session = #session{ id = SessionId, created_at = now_ms(), last_alive_at = now_ms(), - conninfo = ConnInfo, - props = Props + conninfo = ConnInfo }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index f73ac812e..911bccb0a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -102,7 +102,7 @@ -export([should_keep/1]). % Tests only --export([get_session_conf/2]). +-export([get_session_conf/1]). -export_type([ t/0, @@ -137,8 +137,6 @@ -type conf() :: #{ %% Max subscriptions allowed max_subscriptions := non_neg_integer() | infinity, - %% Max inflight messages allowed - max_inflight := non_neg_integer(), %% Maximum number of awaiting QoS2 messages allowed max_awaiting_rel := non_neg_integer() | infinity, %% Upgrade QoS? @@ -171,7 +169,7 @@ -callback create(clientinfo(), conninfo(), conf()) -> t(). --callback open(clientinfo(), conninfo()) -> +-callback open(clientinfo(), conninfo(), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. @@ -181,7 +179,7 @@ -spec create(clientinfo(), conninfo()) -> t(). create(ClientInfo, ConnInfo) -> - Conf = get_session_conf(ClientInfo, ConnInfo), + Conf = get_session_conf(ClientInfo), create(ClientInfo, ConnInfo, Conf). create(ClientInfo, ConnInfo, Conf) -> @@ -198,12 +196,12 @@ create(Mod, ClientInfo, ConnInfo, Conf) -> -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. open(ClientInfo, ConnInfo) -> - Conf = get_session_conf(ClientInfo, ConnInfo), + Conf = get_session_conf(ClientInfo), Mods = [Default | _] = choose_impl_candidates(ConnInfo), %% NOTE %% Try to look the existing session up in session stores corresponding to the given %% `Mods` in order, starting from the last one. - case try_open(Mods, ClientInfo, ConnInfo) of + case try_open(Mods, ClientInfo, ConnInfo, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> @@ -212,24 +210,20 @@ open(ClientInfo, ConnInfo) -> {false, create(Default, ClientInfo, ConnInfo, Conf)} end. -try_open([Mod | Rest], ClientInfo, ConnInfo) -> - case try_open(Rest, ClientInfo, ConnInfo) of +try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) -> + case try_open(Rest, ClientInfo, ConnInfo, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> - Mod:open(ClientInfo, ConnInfo) + Mod:open(ClientInfo, ConnInfo, Conf) end; -try_open([], _ClientInfo, _ConnInfo) -> +try_open([], _ClientInfo, _ConnInfo, _Conf) -> false. --spec get_session_conf(clientinfo(), conninfo()) -> conf(). -get_session_conf( - #{zone := Zone}, - #{receive_maximum := MaxInflight} -) -> +-spec get_session_conf(clientinfo()) -> conf(). +get_session_conf(_ClientInfo = #{zone := Zone}) -> #{ max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), - max_inflight => MaxInflight, max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel), upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), retry_interval => get_mqtt_conf(Zone, retry_interval), diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 178c71e12..1d626fdb0 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -59,7 +59,7 @@ -export([ create/3, - open/2, + open/3, destroy/1 ]). @@ -152,24 +152,24 @@ -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> session(). -create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) -> +create( + #{zone := Zone, clientid := ClientId}, + #{expiry_interval := EI, receive_maximum := ReceiveMax}, + Conf +) -> QueueOpts = get_mqueue_conf(Zone), - #session{ + Session = #session{ id = emqx_guid:gen(), clientid = ClientId, created_at = erlang:system_time(millisecond), is_persistent = EI > 0, subscriptions = #{}, - inflight = emqx_inflight:new(maps:get(max_inflight, Conf)), + inflight = emqx_inflight:new(ReceiveMax), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - awaiting_rel = #{}, - max_subscriptions = maps:get(max_subscriptions, Conf), - max_awaiting_rel = maps:get(max_awaiting_rel, Conf), - upgrade_qos = maps:get(upgrade_qos, Conf), - retry_interval = maps:get(retry_interval, Conf), - await_rel_timeout = maps:get(await_rel_timeout, Conf) - }. + awaiting_rel = #{} + }, + preserve_conf(Conf, Session). get_mqueue_conf(Zone) -> #{ @@ -195,14 +195,16 @@ destroy(_Session) -> %% Open a (possibly existing) Session %%-------------------------------------------------------------------- --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. -open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> +open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> case emqx_cm:takeover_session_begin(ClientId) of {ok, SessionRemote, TakeoverState} -> - Session = resume(ClientInfo, SessionRemote), + Session0 = resume(ClientInfo, SessionRemote), case emqx_cm:takeover_session_end(TakeoverState) of {ok, Pendings} -> + Session1 = resize_inflight(ConnInfo, Session0), + Session = preserve_conf(Conf, Session1), clean_session(ClientInfo, Session, Pendings); {error, _} -> % TODO log error? @@ -212,6 +214,20 @@ open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> false end. +resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = Inflight}) -> + Session#session{ + inflight = emqx_inflight:resize(ReceiveMax, Inflight) + }. + +preserve_conf(Conf, Session = #session{}) -> + Session#session{ + max_subscriptions = maps:get(max_subscriptions, Conf), + max_awaiting_rel = maps:get(max_awaiting_rel, Conf), + upgrade_qos = maps:get(upgrade_qos, Conf), + retry_interval = maps:get(retry_interval, Conf), + await_rel_timeout = maps:get(await_rel_timeout, Conf) + }. + clean_session(ClientInfo, Session = #session{mqueue = Q}, Pendings) -> Q1 = emqx_mqueue:filter(fun emqx_session:should_keep/1, Q), Session1 = Session#session{mqueue = Q1}, diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index 7f10635c1..20d622941 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -67,7 +67,7 @@ t_session_init(_) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, - emqx_session:get_session_conf(ClientInfo, ConnInfo) + emqx_session:get_session_conf(ClientInfo) ), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), ?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)), @@ -531,7 +531,7 @@ session(InitFields) when is_map(InitFields) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, - emqx_session:get_session_conf(ClientInfo, ConnInfo) + emqx_session:get_session_conf(ClientInfo) ), maps:fold( fun(Field, Value, SessionAcc) -> diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl index 3621aa627..21f2c7e36 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -54,7 +54,7 @@ init(ClientInfo) -> ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, - SessionConf = emqx_session:get_session_conf(ClientInfo, ConnInfo), + SessionConf = emqx_session:get_session_conf(ClientInfo), #{ registry => emqx_mqttsn_registry:init(), session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf)