From 94254ec05b83f9fdce439cd1226b00c7403b3bf2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 7 Feb 2024 23:55:44 +0100 Subject: [PATCH] feat(sessds): Correct handling of gaps in the seqno series --- apps/emqx/src/emqx_persistent_session_ds.erl | 35 ++- .../emqx_persistent_session_ds_inflight.erl | 236 ++++++++++++++++-- 2 files changed, 238 insertions(+), 33 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 16b6db8a9..84f55e762 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -389,7 +389,7 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session0) -> case update_seqno(puback, PacketId, Session0) of {ok, Msg, Session} -> - {ok, Msg, [], inc_send_quota(Session)}; + {ok, Msg, [], pull_now(Session)}; Error -> Error end. @@ -429,7 +429,7 @@ pubrel(_PacketId, Session = #{}) -> pubcomp(_ClientInfo, PacketId, Session0) -> case update_seqno(pubcomp, PacketId, Session0) of {ok, Msg, Session} -> - {ok, Msg, [], inc_send_quota(Session)}; + {ok, Msg, [], pull_now(Session)}; Error = {error, _} -> Error end. @@ -907,11 +907,6 @@ ensure_timers(Session0) -> Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). --spec inc_send_quota(session()) -> session(). -inc_send_quota(Session = #{inflight := Inflight0}) -> - Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), - pull_now(Session#{inflight => Inflight}). - -spec pull_now(session()) -> session(). pull_now(Session) -> emqx_session:reset_timer(?TIMER_PULL, 0, Session). @@ -957,26 +952,28 @@ try_get_live_session(ClientId) -> -spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, _}. -update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> +update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) -> SeqNo = packet_id_to_seqno(PacketId, S), case Track of puback -> - QoS = ?QOS_1, - SeqNoKey = ?committed(?QOS_1); + SeqNoKey = ?committed(?QOS_1), + Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0); pubrec -> - QoS = ?QOS_2, - SeqNoKey = ?rec; + SeqNoKey = ?rec, + Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0); pubcomp -> - QoS = ?QOS_2, - SeqNoKey = ?committed(?QOS_2) + SeqNoKey = ?committed(?QOS_2), + Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0) end, - Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S), - case inc_seqno(QoS, Current) of - SeqNo -> + case Result of + {ok, Inflight} -> %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(SessionId, <<>>, <<>>), - {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}}; - Expected -> + {ok, Msg, Session#{ + s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S), + inflight => Inflight + }}; + {error, Expected} -> ?SLOG(warning, #{ msg => "out-of-order_commit", track => Track, diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl index 1a603abde..349713bf6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -22,7 +22,9 @@ pop/1, n_buffered/2, n_inflight/1, - inc_send_quota/1, + puback/2, + pubrec/2, + pubcomp/2, receive_maximum/1 ]). @@ -34,13 +36,28 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-ifdef(TEST). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %%================================================================================ %% Type declarations %%================================================================================ +-type payload() :: + {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()} + | {pubrel, emqx_persistent_session_ds:seqno()}. + -record(inflight, { - queue :: queue:queue(), receive_maximum :: pos_integer(), + %% Main queue: + queue :: queue:queue(payload()), + %% Queues that are used to track sequence numbers of ack tracks: + puback_queue :: iqueue(), + pubrec_queue :: iqueue(), + pubcomp_queue :: iqueue(), + %% Counters: n_inflight = 0 :: non_neg_integer(), n_qos0 = 0 :: non_neg_integer(), n_qos1 = 0 :: non_neg_integer(), @@ -49,17 +66,19 @@ -type t() :: #inflight{}. --type payload() :: - {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()} - | {pubrel, emqx_persistent_session_ds:seqno()}. - %%================================================================================ %% API funcions %%================================================================================ -spec new(non_neg_integer()) -> t(). new(ReceiveMaximum) when ReceiveMaximum > 0 -> - #inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}. + #inflight{ + receive_maximum = ReceiveMaximum, + queue = queue:new(), + puback_queue = iqueue_new(), + pubrec_queue = iqueue_new(), + pubcomp_queue = iqueue_new() + }. -spec receive_maximum(t()) -> pos_integer(). receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) -> @@ -86,6 +105,9 @@ pop(Rec0) -> receive_maximum = ReceiveMaximum, n_inflight = NInflight, queue = Q0, + puback_queue = QAck, + pubrec_queue = QRec, + pubcomp_queue = QComp, n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2 @@ -96,17 +118,24 @@ pop(Rec0) -> case Payload of {pubrel, _} -> Rec0#inflight{queue = Q}; - {_, #message{qos = Qos}} -> + {SeqNo, #message{qos = Qos}} -> case Qos of ?QOS_0 -> Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1}; ?QOS_1 -> Rec0#inflight{ - queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1 + queue = Q, + n_qos1 = NQos1 - 1, + n_inflight = NInflight + 1, + puback_queue = ipush(SeqNo, QAck) }; ?QOS_2 -> Rec0#inflight{ - queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1 + queue = Q, + n_qos2 = NQos2 - 1, + n_inflight = NInflight + 1, + pubrec_queue = ipush(SeqNo, QRec), + pubcomp_queue = ipush(SeqNo, QComp) } end end, @@ -129,12 +158,191 @@ n_buffered(all, #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) -> n_inflight(#inflight{n_inflight = NInflight}) -> NInflight. +-spec puback(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +puback(SeqNo, Rec = #inflight{puback_queue = Q0, n_inflight = N}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + puback_queue = Q, + n_inflight = max(0, N - 1) + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. + +-spec pubcomp(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +pubcomp(SeqNo, Rec = #inflight{pubcomp_queue = Q0, n_inflight = N}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + pubcomp_queue = Q, + n_inflight = max(0, N - 1) + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. + +%% PUBREC doesn't affect inflight window: %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control --spec inc_send_quota(t()) -> t(). -inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) -> - NInflight = max(NInflight0 - 1, 0), - Rec#inflight{n_inflight = NInflight}. +-spec pubrec(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + pubrec_queue = Q + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. %%================================================================================ %% Internal functions %%================================================================================ + +%%%% Interval queue: + +%% "Interval queue": a data structure that represents a queue of +%% monotonically increasing integers in a compact manner. It is +%% functionally equivalent to a `queue:queue(integer())'. +-record(iqueue, { + %% Head interval: + head :: integer() | undefined, + head_end :: integer() | undefined, + %% Intermediate ranges: + queue :: queue:queue({integer(), integer()}), + %% End interval: + tail :: integer() | undefined, + tail_end :: integer() | undefined +}). + +-type iqueue() :: #iqueue{}. + +iqueue_new() -> + #iqueue{ + queue = queue:new() + }. + +%% @doc Push a value into the interval queue: +-spec ipush(integer(), iqueue()) -> iqueue(). +ipush(Val, Q = #iqueue{tail = undefined, tail_end = undefined}) -> + Q#iqueue{ + tail = Val, + tail_end = Val + 1 + }; +ipush(Val, Q = #iqueue{tail_end = Val}) -> + %% Extend tail interval: + Q#iqueue{ + tail_end = Val + 1 + }; +ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End -> + IQ = queue:in({Tl, End}, IQ0), + %% Begin a new interval: + Q#iqueue{ + queue = IQ, + tail = Val, + tail_end = Val + 1 + }. + +-spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}. +ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd -> + {{value, Hd}, Q#iqueue{head = Hd + 1}}; +ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) -> + case queue:out(IQ0) of + {{value, {Hd, HdEnd}}, IQ} -> + ipop(Q#iqueue{head = nmax(Hd0, Hd), head_end = HdEnd, queue = IQ}); + {empty, _} -> + do_ipop(Q#iqueue{head = nmax(Hd0, Tl), head_end = TlEnd}) + end. + +do_ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd -> + {{value, Hd}, Q#iqueue{head = Hd + 1}}; +do_ipop(Q) -> + {empty, Q}. + +nmax(undefined, N) -> + N; +nmax(N, undefined) -> + N; +nmax(N, M) -> + max(N, M). + +-ifdef(TEST). + +%% Test that behavior of iqueue is identical to that of a regular queue of integers: +iqueue_compat_test_() -> + Props = [iqueue_compat()], + Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}], + {timeout, 30, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}. + +%% Generate a sequence of pops and pushes with monotonically +%% increasing arguments, and verify replaying produces equivalent +%% results for the optimized and the reference implementation: +iqueue_compat() -> + ?FORALL( + Cmds, + iqueue_commands(), + begin + lists:foldl( + fun + ({push, N}, {IQ, Q, Acc}) -> + {ipush(N, IQ), queue:in(N, Q), [N | Acc]}; + (pop, {IQ0, Q0, Acc}) -> + {Ret, IQ} = ipop(IQ0), + {Expected, Q} = queue:out(Q0), + ?assertEqual( + Expected, + Ret, + #{ + sequence => lists:reverse(Acc), + q => queue:to_list(Q0), + iq0 => iqueue_print(IQ0), + iq => iqueue_print(IQ) + } + ), + {IQ, Q, [pop | Acc]} + end, + {iqueue_new(), queue:new(), []}, + Cmds + ), + true + end + ). + +iqueue_cmd() -> + oneof([ + pop, + {push, range(1, 3)} + ]). + +iqueue_commands() -> + ?LET( + Cmds, + list(iqueue_cmd()), + process_test_cmds(Cmds, 0) + ). + +process_test_cmds([], _) -> + []; +process_test_cmds([pop | Tl], Cnt) -> + [pop | process_test_cmds(Tl, Cnt)]; +process_test_cmds([{push, N} | Tl], Cnt0) -> + Cnt = Cnt0 + N, + [{push, Cnt} | process_test_cmds(Tl, Cnt)]. + +iqueue_print(I = #iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) -> + #{ + hd => {Hd, HdEnd}, + tl => {Tl, TlEnd}, + q => queue:to_list(Q) + }. + +-endif.