From 124c0e2dba25aee43c086fb8f9bd028a46e377de Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:28:13 +0100 Subject: [PATCH] fix(ds): Respect receive_maximum from the connection info --- .../emqx_persistent_message_ds_replayer.erl | 13 ++++- apps/emqx/src/emqx_persistent_session_ds.erl | 57 +++++++++++++------ 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 98bb069b0..69b6675d8 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -19,7 +19,7 @@ -module(emqx_persistent_message_ds_replayer). %% API: --export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3]). +-export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3, n_inflight/1]). %% internal exports: -export([]). @@ -79,6 +79,17 @@ next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqNo}) -> {PacketId, Inflight} end. +-spec n_inflight(inflight()) -> non_neg_integer(). +n_inflight(#inflight{next_seqno = NextSeqNo, acked_seqno = AckedSeqno}) -> + %% NOTE: this function assumes that gaps in the sequence ID occur + %% _only_ when the packet ID wraps: + case AckedSeqno >= ((NextSeqNo bsr 16) bsl 16) of + true -> + NextSeqNo - AckedSeqno; + false -> + NextSeqNo - AckedSeqno - 1 + end. + -spec replay(emqx_persistent_session_ds:id(), inflight()) -> emqx_session:replies(). replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index bc60a1277..6c0fc2dcc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -101,6 +101,8 @@ iterators := #{topic() => subscription()}, %% Inflight messages inflight := emqx_persistent_message_ds_replayer:inflight(), + %% Receive maximum + receive_maximum := pos_integer(), %% props := map() }. @@ -111,20 +113,28 @@ -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). +-define(STATS_KEYS, [ + subscriptions_cnt, + subscriptions_max, + inflight_cnt, + inflight_max, + next_pkt_id +]). + -export_type([id/0]). %% -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> session(). -create(#{clientid := ClientID}, _ConnInfo, Conf) -> +create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration ensure_timers(), - ensure_session(ClientID, Conf). + ensure_session(ClientID, ConnInfo, Conf). -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID}, _ConnInfo) -> +open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we @@ -133,16 +143,19 @@ open(#{clientid := ClientID}, _ConnInfo) -> %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), case open_session(ClientID) of - Session = #{} -> + Session0 = #{} -> ensure_timers(), + ReceiveMaximum = receive_maximum(ConnInfo), + Session = Session0#{receive_maximum => ReceiveMaximum}, {true, Session, []}; false -> false end. -ensure_session(ClientID, Conf) -> +ensure_session(ClientID, ConnInfo, Conf) -> {ok, Session, #{}} = session_ensure_new(ClientID, Conf), - Session#{iterators => #{}}. + ReceiveMaximum = receive_maximum(ConnInfo), + Session#{iterators => #{}, receive_maximum => ReceiveMaximum}. open_session(ClientID) -> case session_open(ClientID) of @@ -192,10 +205,10 @@ info(upgrade_qos, #{props := Conf}) -> maps:get(upgrade_qos, Conf); % info(inflight, #sessmem{inflight = Inflight}) -> % Inflight; -% info(inflight_cnt, #sessmem{inflight = Inflight}) -> -% emqx_inflight:size(Inflight); -% info(inflight_max, #sessmem{inflight = Inflight}) -> -% emqx_inflight:max_size(Inflight); +info(inflight_cnt, #{inflight := Inflight}) -> + emqx_persistent_message_ds_replayer:n_inflight(Inflight); +info(inflight_max, #{receive_maximum := ReceiveMaximum}) -> + ReceiveMaximum; info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); % info(mqueue, #sessmem{mqueue = MQueue}) -> @@ -206,8 +219,9 @@ info(retry_interval, #{props := Conf}) -> % emqx_mqueue:max_len(MQueue); % info(mqueue_dropped, #sessmem{mqueue = MQueue}) -> % emqx_mqueue:dropped(MQueue); -info(next_pkt_id, #{}) -> - _PacketId = 'TODO'; +info(next_pkt_id, #{inflight := Inflight}) -> + {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(Inflight), + PacketId; % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % AwaitingRel; % info(awaiting_rel_cnt, #sessmem{awaiting_rel = AwaitingRel}) -> @@ -219,8 +233,7 @@ info(await_rel_timeout, #{props := Conf}) -> -spec stats(session()) -> emqx_types:stats(). stats(Session) -> - % TODO: stub - info([], Session). + info(?STATS_KEYS, Session). %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE @@ -345,9 +358,12 @@ deliver(_ClientInfo, _Delivers, Session) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. -handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> - WindowSize = 1000, - {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), +handle_timeout( + _ClientInfo, + pull, + Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} +) -> + {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, ReceiveMaximum), %% TODO: make these values configurable: Timeout = case Publishes of @@ -781,6 +797,13 @@ ensure_timer(Type, Timeout) -> _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok. +-spec receive_maximum(conninfo()) -> pos_integer(). +receive_maximum(ConnInfo) -> + %% Note: the default value should be always set by the channel + %% with respect to the zone configuration, but the type spec + %% indicates that it's optional. + maps:get(receive_maximum, ConnInfo, 65_535). + -ifdef(TEST). list_all_sessions() -> DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),