feat(sessds): Correct handling of gaps in the seqno series
This commit is contained in:
parent
fa359246c1
commit
94254ec05b
|
@ -389,7 +389,7 @@ publish(_PacketId, Msg, Session) ->
|
|||
puback(_ClientInfo, PacketId, Session0) ->
|
||||
case update_seqno(puback, PacketId, Session0) of
|
||||
{ok, Msg, Session} ->
|
||||
{ok, Msg, [], inc_send_quota(Session)};
|
||||
{ok, Msg, [], pull_now(Session)};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
@ -429,7 +429,7 @@ pubrel(_PacketId, Session = #{}) ->
|
|||
pubcomp(_ClientInfo, PacketId, Session0) ->
|
||||
case update_seqno(pubcomp, PacketId, Session0) of
|
||||
{ok, Msg, Session} ->
|
||||
{ok, Msg, [], inc_send_quota(Session)};
|
||||
{ok, Msg, [], pull_now(Session)};
|
||||
Error = {error, _} ->
|
||||
Error
|
||||
end.
|
||||
|
@ -907,11 +907,6 @@ ensure_timers(Session0) ->
|
|||
Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
|
||||
emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
|
||||
|
||||
-spec inc_send_quota(session()) -> session().
|
||||
inc_send_quota(Session = #{inflight := Inflight0}) ->
|
||||
Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0),
|
||||
pull_now(Session#{inflight => Inflight}).
|
||||
|
||||
-spec pull_now(session()) -> session().
|
||||
pull_now(Session) ->
|
||||
emqx_session:reset_timer(?TIMER_PULL, 0, Session).
|
||||
|
@ -957,26 +952,28 @@ try_get_live_session(ClientId) ->
|
|||
|
||||
-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
|
||||
{ok, emqx_types:message(), session()} | {error, _}.
|
||||
update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
|
||||
update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) ->
|
||||
SeqNo = packet_id_to_seqno(PacketId, S),
|
||||
case Track of
|
||||
puback ->
|
||||
QoS = ?QOS_1,
|
||||
SeqNoKey = ?committed(?QOS_1);
|
||||
SeqNoKey = ?committed(?QOS_1),
|
||||
Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0);
|
||||
pubrec ->
|
||||
QoS = ?QOS_2,
|
||||
SeqNoKey = ?rec;
|
||||
SeqNoKey = ?rec,
|
||||
Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0);
|
||||
pubcomp ->
|
||||
QoS = ?QOS_2,
|
||||
SeqNoKey = ?committed(?QOS_2)
|
||||
SeqNoKey = ?committed(?QOS_2),
|
||||
Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0)
|
||||
end,
|
||||
Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S),
|
||||
case inc_seqno(QoS, Current) of
|
||||
SeqNo ->
|
||||
case Result of
|
||||
{ok, Inflight} ->
|
||||
%% TODO: we pass a bogus message into the hook:
|
||||
Msg = emqx_message:make(SessionId, <<>>, <<>>),
|
||||
{ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}};
|
||||
Expected ->
|
||||
{ok, Msg, Session#{
|
||||
s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
|
||||
inflight => Inflight
|
||||
}};
|
||||
{error, Expected} ->
|
||||
?SLOG(warning, #{
|
||||
msg => "out-of-order_commit",
|
||||
track => Track,
|
||||
|
|
|
@ -22,7 +22,9 @@
|
|||
pop/1,
|
||||
n_buffered/2,
|
||||
n_inflight/1,
|
||||
inc_send_quota/1,
|
||||
puback/2,
|
||||
pubrec/2,
|
||||
pubcomp/2,
|
||||
receive_maximum/1
|
||||
]).
|
||||
|
||||
|
@ -34,13 +36,28 @@
|
|||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
-type payload() ::
|
||||
{emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()}
|
||||
| {pubrel, emqx_persistent_session_ds:seqno()}.
|
||||
|
||||
-record(inflight, {
|
||||
queue :: queue:queue(),
|
||||
receive_maximum :: pos_integer(),
|
||||
%% Main queue:
|
||||
queue :: queue:queue(payload()),
|
||||
%% Queues that are used to track sequence numbers of ack tracks:
|
||||
puback_queue :: iqueue(),
|
||||
pubrec_queue :: iqueue(),
|
||||
pubcomp_queue :: iqueue(),
|
||||
%% Counters:
|
||||
n_inflight = 0 :: non_neg_integer(),
|
||||
n_qos0 = 0 :: non_neg_integer(),
|
||||
n_qos1 = 0 :: non_neg_integer(),
|
||||
|
@ -49,17 +66,19 @@
|
|||
|
||||
-type t() :: #inflight{}.
|
||||
|
||||
-type payload() ::
|
||||
{emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()}
|
||||
| {pubrel, emqx_persistent_session_ds:seqno()}.
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
-spec new(non_neg_integer()) -> t().
|
||||
new(ReceiveMaximum) when ReceiveMaximum > 0 ->
|
||||
#inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}.
|
||||
#inflight{
|
||||
receive_maximum = ReceiveMaximum,
|
||||
queue = queue:new(),
|
||||
puback_queue = iqueue_new(),
|
||||
pubrec_queue = iqueue_new(),
|
||||
pubcomp_queue = iqueue_new()
|
||||
}.
|
||||
|
||||
-spec receive_maximum(t()) -> pos_integer().
|
||||
receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) ->
|
||||
|
@ -86,6 +105,9 @@ pop(Rec0) ->
|
|||
receive_maximum = ReceiveMaximum,
|
||||
n_inflight = NInflight,
|
||||
queue = Q0,
|
||||
puback_queue = QAck,
|
||||
pubrec_queue = QRec,
|
||||
pubcomp_queue = QComp,
|
||||
n_qos0 = NQos0,
|
||||
n_qos1 = NQos1,
|
||||
n_qos2 = NQos2
|
||||
|
@ -96,17 +118,24 @@ pop(Rec0) ->
|
|||
case Payload of
|
||||
{pubrel, _} ->
|
||||
Rec0#inflight{queue = Q};
|
||||
{_, #message{qos = Qos}} ->
|
||||
{SeqNo, #message{qos = Qos}} ->
|
||||
case Qos of
|
||||
?QOS_0 ->
|
||||
Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1};
|
||||
?QOS_1 ->
|
||||
Rec0#inflight{
|
||||
queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1
|
||||
queue = Q,
|
||||
n_qos1 = NQos1 - 1,
|
||||
n_inflight = NInflight + 1,
|
||||
puback_queue = ipush(SeqNo, QAck)
|
||||
};
|
||||
?QOS_2 ->
|
||||
Rec0#inflight{
|
||||
queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1
|
||||
queue = Q,
|
||||
n_qos2 = NQos2 - 1,
|
||||
n_inflight = NInflight + 1,
|
||||
pubrec_queue = ipush(SeqNo, QRec),
|
||||
pubcomp_queue = ipush(SeqNo, QComp)
|
||||
}
|
||||
end
|
||||
end,
|
||||
|
@ -129,12 +158,191 @@ n_buffered(all, #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) ->
|
|||
n_inflight(#inflight{n_inflight = NInflight}) ->
|
||||
NInflight.
|
||||
|
||||
-spec puback(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
|
||||
Expected :: emqx_persistent_session_ds:seqno() | undefined.
|
||||
puback(SeqNo, Rec = #inflight{puback_queue = Q0, n_inflight = N}) ->
|
||||
case ipop(Q0) of
|
||||
{{value, SeqNo}, Q} ->
|
||||
{ok, Rec#inflight{
|
||||
puback_queue = Q,
|
||||
n_inflight = max(0, N - 1)
|
||||
}};
|
||||
{{value, Expected}, _} ->
|
||||
{error, Expected};
|
||||
_ ->
|
||||
{error, undefined}
|
||||
end.
|
||||
|
||||
-spec pubcomp(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
|
||||
Expected :: emqx_persistent_session_ds:seqno() | undefined.
|
||||
pubcomp(SeqNo, Rec = #inflight{pubcomp_queue = Q0, n_inflight = N}) ->
|
||||
case ipop(Q0) of
|
||||
{{value, SeqNo}, Q} ->
|
||||
{ok, Rec#inflight{
|
||||
pubcomp_queue = Q,
|
||||
n_inflight = max(0, N - 1)
|
||||
}};
|
||||
{{value, Expected}, _} ->
|
||||
{error, Expected};
|
||||
_ ->
|
||||
{error, undefined}
|
||||
end.
|
||||
|
||||
%% PUBREC doesn't affect inflight window:
|
||||
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control
|
||||
-spec inc_send_quota(t()) -> t().
|
||||
inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) ->
|
||||
NInflight = max(NInflight0 - 1, 0),
|
||||
Rec#inflight{n_inflight = NInflight}.
|
||||
-spec pubrec(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
|
||||
Expected :: emqx_persistent_session_ds:seqno() | undefined.
|
||||
pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) ->
|
||||
case ipop(Q0) of
|
||||
{{value, SeqNo}, Q} ->
|
||||
{ok, Rec#inflight{
|
||||
pubrec_queue = Q
|
||||
}};
|
||||
{{value, Expected}, _} ->
|
||||
{error, Expected};
|
||||
_ ->
|
||||
{error, undefined}
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
||||
%%%% Interval queue:
|
||||
|
||||
%% "Interval queue": a data structure that represents a queue of
|
||||
%% monotonically increasing integers in a compact manner. It is
|
||||
%% functionally equivalent to a `queue:queue(integer())'.
|
||||
-record(iqueue, {
|
||||
%% Head interval:
|
||||
head :: integer() | undefined,
|
||||
head_end :: integer() | undefined,
|
||||
%% Intermediate ranges:
|
||||
queue :: queue:queue({integer(), integer()}),
|
||||
%% End interval:
|
||||
tail :: integer() | undefined,
|
||||
tail_end :: integer() | undefined
|
||||
}).
|
||||
|
||||
-type iqueue() :: #iqueue{}.
|
||||
|
||||
iqueue_new() ->
|
||||
#iqueue{
|
||||
queue = queue:new()
|
||||
}.
|
||||
|
||||
%% @doc Push a value into the interval queue:
|
||||
-spec ipush(integer(), iqueue()) -> iqueue().
|
||||
ipush(Val, Q = #iqueue{tail = undefined, tail_end = undefined}) ->
|
||||
Q#iqueue{
|
||||
tail = Val,
|
||||
tail_end = Val + 1
|
||||
};
|
||||
ipush(Val, Q = #iqueue{tail_end = Val}) ->
|
||||
%% Extend tail interval:
|
||||
Q#iqueue{
|
||||
tail_end = Val + 1
|
||||
};
|
||||
ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End ->
|
||||
IQ = queue:in({Tl, End}, IQ0),
|
||||
%% Begin a new interval:
|
||||
Q#iqueue{
|
||||
queue = IQ,
|
||||
tail = Val,
|
||||
tail_end = Val + 1
|
||||
}.
|
||||
|
||||
-spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}.
|
||||
ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd ->
|
||||
{{value, Hd}, Q#iqueue{head = Hd + 1}};
|
||||
ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) ->
|
||||
case queue:out(IQ0) of
|
||||
{{value, {Hd, HdEnd}}, IQ} ->
|
||||
ipop(Q#iqueue{head = nmax(Hd0, Hd), head_end = HdEnd, queue = IQ});
|
||||
{empty, _} ->
|
||||
do_ipop(Q#iqueue{head = nmax(Hd0, Tl), head_end = TlEnd})
|
||||
end.
|
||||
|
||||
do_ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd ->
|
||||
{{value, Hd}, Q#iqueue{head = Hd + 1}};
|
||||
do_ipop(Q) ->
|
||||
{empty, Q}.
|
||||
|
||||
nmax(undefined, N) ->
|
||||
N;
|
||||
nmax(N, undefined) ->
|
||||
N;
|
||||
nmax(N, M) ->
|
||||
max(N, M).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
%% Test that behavior of iqueue is identical to that of a regular queue of integers:
|
||||
iqueue_compat_test_() ->
|
||||
Props = [iqueue_compat()],
|
||||
Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
|
||||
{timeout, 30, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
|
||||
|
||||
%% Generate a sequence of pops and pushes with monotonically
|
||||
%% increasing arguments, and verify replaying produces equivalent
|
||||
%% results for the optimized and the reference implementation:
|
||||
iqueue_compat() ->
|
||||
?FORALL(
|
||||
Cmds,
|
||||
iqueue_commands(),
|
||||
begin
|
||||
lists:foldl(
|
||||
fun
|
||||
({push, N}, {IQ, Q, Acc}) ->
|
||||
{ipush(N, IQ), queue:in(N, Q), [N | Acc]};
|
||||
(pop, {IQ0, Q0, Acc}) ->
|
||||
{Ret, IQ} = ipop(IQ0),
|
||||
{Expected, Q} = queue:out(Q0),
|
||||
?assertEqual(
|
||||
Expected,
|
||||
Ret,
|
||||
#{
|
||||
sequence => lists:reverse(Acc),
|
||||
q => queue:to_list(Q0),
|
||||
iq0 => iqueue_print(IQ0),
|
||||
iq => iqueue_print(IQ)
|
||||
}
|
||||
),
|
||||
{IQ, Q, [pop | Acc]}
|
||||
end,
|
||||
{iqueue_new(), queue:new(), []},
|
||||
Cmds
|
||||
),
|
||||
true
|
||||
end
|
||||
).
|
||||
|
||||
iqueue_cmd() ->
|
||||
oneof([
|
||||
pop,
|
||||
{push, range(1, 3)}
|
||||
]).
|
||||
|
||||
iqueue_commands() ->
|
||||
?LET(
|
||||
Cmds,
|
||||
list(iqueue_cmd()),
|
||||
process_test_cmds(Cmds, 0)
|
||||
).
|
||||
|
||||
process_test_cmds([], _) ->
|
||||
[];
|
||||
process_test_cmds([pop | Tl], Cnt) ->
|
||||
[pop | process_test_cmds(Tl, Cnt)];
|
||||
process_test_cmds([{push, N} | Tl], Cnt0) ->
|
||||
Cnt = Cnt0 + N,
|
||||
[{push, Cnt} | process_test_cmds(Tl, Cnt)].
|
||||
|
||||
iqueue_print(I = #iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) ->
|
||||
#{
|
||||
hd => {Hd, HdEnd},
|
||||
tl => {Tl, TlEnd},
|
||||
q => queue:to_list(Q)
|
||||
}.
|
||||
|
||||
-endif.
|
||||
|
|
Loading…
Reference in New Issue