Merge pull request #11948 from ieQu1/dev/EMQX-10998-receive-maximum

fix(ds): Respect receive_maximum from the connection info
This commit is contained in:
ieQu1 2023-11-14 16:45:36 +01:00 committed by GitHub
commit 8ddef21ac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 18 deletions

View File

@ -19,7 +19,7 @@
-module(emqx_persistent_message_ds_replayer). -module(emqx_persistent_message_ds_replayer).
%% API: %% API:
-export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3]). -export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3, n_inflight/1]).
%% internal exports: %% internal exports:
-export([]). -export([]).
@ -79,6 +79,17 @@ next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqNo}) ->
{PacketId, Inflight} {PacketId, Inflight}
end. end.
-spec n_inflight(inflight()) -> non_neg_integer().
n_inflight(#inflight{next_seqno = NextSeqNo, acked_seqno = AckedSeqno}) ->
%% NOTE: this function assumes that gaps in the sequence ID occur
%% _only_ when the packet ID wraps:
case AckedSeqno >= ((NextSeqNo bsr 16) bsl 16) of
true ->
NextSeqNo - AckedSeqno;
false ->
NextSeqNo - AckedSeqno - 1
end.
-spec replay(emqx_persistent_session_ds:id(), inflight()) -> -spec replay(emqx_persistent_session_ds:id(), inflight()) ->
emqx_session:replies(). emqx_session:replies().
replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) -> replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) ->

View File

@ -101,6 +101,8 @@
iterators := #{topic() => subscription()}, iterators := #{topic() => subscription()},
%% Inflight messages %% Inflight messages
inflight := emqx_persistent_message_ds_replayer:inflight(), inflight := emqx_persistent_message_ds_replayer:inflight(),
%% Receive maximum
receive_maximum := pos_integer(),
%% %%
props := map() props := map()
}. }.
@ -111,20 +113,28 @@
-type conninfo() :: emqx_session:conninfo(). -type conninfo() :: emqx_session:conninfo().
-type replies() :: emqx_session:replies(). -type replies() :: emqx_session:replies().
-define(STATS_KEYS, [
subscriptions_cnt,
subscriptions_max,
inflight_cnt,
inflight_max,
next_pkt_id
]).
-export_type([id/0]). -export_type([id/0]).
%% %%
-spec create(clientinfo(), conninfo(), emqx_session:conf()) -> -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
session(). session().
create(#{clientid := ClientID}, _ConnInfo, Conf) -> create(#{clientid := ClientID}, ConnInfo, Conf) ->
% TODO: expiration % TODO: expiration
ensure_timers(), ensure_timers(),
ensure_session(ClientID, Conf). ensure_session(ClientID, ConnInfo, Conf).
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo()) ->
{_IsPresent :: true, session(), []} | false. {_IsPresent :: true, session(), []} | false.
open(#{clientid := ClientID}, _ConnInfo) -> open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
%% NOTE %% NOTE
%% The fact that we need to concern about discarding all live channels here %% The fact that we need to concern about discarding all live channels here
%% is essentially a consequence of the in-memory session design, where we %% is essentially a consequence of the in-memory session design, where we
@ -133,16 +143,19 @@ open(#{clientid := ClientID}, _ConnInfo) ->
%% space, and move this call back into `emqx_cm` where it belongs. %% space, and move this call back into `emqx_cm` where it belongs.
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
case open_session(ClientID) of case open_session(ClientID) of
Session = #{} -> Session0 = #{} ->
ensure_timers(), ensure_timers(),
ReceiveMaximum = receive_maximum(ConnInfo),
Session = Session0#{receive_maximum => ReceiveMaximum},
{true, Session, []}; {true, Session, []};
false -> false ->
false false
end. end.
ensure_session(ClientID, Conf) -> ensure_session(ClientID, ConnInfo, Conf) ->
{ok, Session, #{}} = session_ensure_new(ClientID, Conf), {ok, Session, #{}} = session_ensure_new(ClientID, Conf),
Session#{iterators => #{}}. ReceiveMaximum = receive_maximum(ConnInfo),
Session#{iterators => #{}, receive_maximum => ReceiveMaximum}.
open_session(ClientID) -> open_session(ClientID) ->
case session_open(ClientID) of case session_open(ClientID) of
@ -192,10 +205,10 @@ info(upgrade_qos, #{props := Conf}) ->
maps:get(upgrade_qos, Conf); maps:get(upgrade_qos, Conf);
% info(inflight, #sessmem{inflight = Inflight}) -> % info(inflight, #sessmem{inflight = Inflight}) ->
% Inflight; % Inflight;
% info(inflight_cnt, #sessmem{inflight = Inflight}) -> info(inflight_cnt, #{inflight := Inflight}) ->
% emqx_inflight:size(Inflight); emqx_persistent_message_ds_replayer:n_inflight(Inflight);
% info(inflight_max, #sessmem{inflight = Inflight}) -> info(inflight_max, #{receive_maximum := ReceiveMaximum}) ->
% emqx_inflight:max_size(Inflight); ReceiveMaximum;
info(retry_interval, #{props := Conf}) -> info(retry_interval, #{props := Conf}) ->
maps:get(retry_interval, Conf); maps:get(retry_interval, Conf);
% info(mqueue, #sessmem{mqueue = MQueue}) -> % info(mqueue, #sessmem{mqueue = MQueue}) ->
@ -206,8 +219,9 @@ info(retry_interval, #{props := Conf}) ->
% emqx_mqueue:max_len(MQueue); % emqx_mqueue:max_len(MQueue);
% info(mqueue_dropped, #sessmem{mqueue = MQueue}) -> % info(mqueue_dropped, #sessmem{mqueue = MQueue}) ->
% emqx_mqueue:dropped(MQueue); % emqx_mqueue:dropped(MQueue);
info(next_pkt_id, #{}) -> info(next_pkt_id, #{inflight := Inflight}) ->
_PacketId = 'TODO'; {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(Inflight),
PacketId;
% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) ->
% AwaitingRel; % AwaitingRel;
% info(awaiting_rel_cnt, #sessmem{awaiting_rel = AwaitingRel}) -> % info(awaiting_rel_cnt, #sessmem{awaiting_rel = AwaitingRel}) ->
@ -219,8 +233,7 @@ info(await_rel_timeout, #{props := Conf}) ->
-spec stats(session()) -> emqx_types:stats(). -spec stats(session()) -> emqx_types:stats().
stats(Session) -> stats(Session) ->
% TODO: stub info(?STATS_KEYS, Session).
info([], Session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
@ -345,9 +358,12 @@ deliver(_ClientInfo, _Delivers, Session) ->
-spec handle_timeout(clientinfo(), _Timeout, session()) -> -spec handle_timeout(clientinfo(), _Timeout, session()) ->
{ok, replies(), session()} | {ok, replies(), timeout(), session()}. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> handle_timeout(
WindowSize = 1000, _ClientInfo,
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), pull,
Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
) ->
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, ReceiveMaximum),
%% TODO: make these values configurable: %% TODO: make these values configurable:
Timeout = Timeout =
case Publishes of case Publishes of
@ -781,6 +797,13 @@ ensure_timer(Type, Timeout) ->
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
ok. ok.
-spec receive_maximum(conninfo()) -> pos_integer().
receive_maximum(ConnInfo) ->
%% Note: the default value should be always set by the channel
%% with respect to the zone configuration, but the type spec
%% indicates that it's optional.
maps:get(receive_maximum, ConnInfo, 65_535).
-ifdef(TEST). -ifdef(TEST).
list_all_sessions() -> list_all_sessions() ->
DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),