fix: rework In-flight / Mqueue API

- use timestamp 'position' to find the next chunk of data
- add 'start' position to response meta
- sort In-flight messages by insertion time
- sort Mqueue messages by priority
This commit is contained in:
Serge Tupchii 2024-03-18 21:20:28 +02:00
parent 5390203184
commit cb5fdb3c79
15 changed files with 597 additions and 357 deletions

View File

@ -36,8 +36,7 @@
max_size/1,
is_full/1,
is_empty/1,
window/1,
query/2
window/1
]).
-export_type([inflight/0]).
@ -139,47 +138,3 @@ size(?INFLIGHT(Tree)) ->
-spec max_size(inflight()) -> non_neg_integer().
max_size(?INFLIGHT(MaxSize, _Tree)) ->
MaxSize.
-spec query(inflight(), #{continuation => Cont, limit := L}) ->
{[{key(), term()}], #{continuation := Cont, count := C}}
when
Cont :: none | end_of_data | key(),
L :: non_neg_integer(),
C :: non_neg_integer().
query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
Count = gb_trees:size(Tree),
ContKey = maps:get(continuation, Pager, none),
{List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit),
{List, #{continuation => NextCont, count => Count}}.
iterator_from(none, Tree) ->
gb_trees:iterator(Tree);
iterator_from(ContKey, Tree) ->
It = gb_trees:iterator_from(ContKey, Tree),
case gb_trees:next(It) of
{ContKey, _Val, ItNext} -> ItNext;
_ -> It
end.
sublist(_It, 0) ->
{[], none};
sublist(It, Len) ->
{ListAcc, HasNext} = sublist(It, Len, []),
{lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
sublist(It, 0, Acc) ->
{Acc, gb_trees:next(It) =/= none};
sublist(It, Len, Acc) ->
case gb_trees:next(It) of
none ->
{Acc, false};
{Key, Val, ItNext} ->
sublist(ItNext, Len - 1, [{Key, Val} | Acc])
end.
next_cont(_Acc, false) ->
end_of_data;
next_cont([{LastKey, _LastVal} | _Acc], _HasNext) ->
LastKey;
next_cont([], _HasNext) ->
end_of_data.

View File

@ -98,6 +98,7 @@
-define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-define(INSERT_TS, mqueue_insert_ts).
-record(shift_opts, {
multiplier :: non_neg_integer(),
@ -172,54 +173,82 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
end.
-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) ->
{[message()], #{continuation := ContMsgId, count := C}}
-spec query(mqueue(), #{position => Pos, limit := Limit}) ->
{[message()], #{position := Pos, start := Pos}}
when
ContMsgId :: none | end_of_data | binary(),
C :: non_neg_integer(),
L :: non_neg_integer().
query(MQ, #{limit := Limit} = Pager) ->
ContMsgId = maps:get(continuation, Pager, none),
{List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit),
{List, #{continuation => NextCont, count => len(MQ)}}.
Pos :: none | {integer(), priority()},
Limit :: non_neg_integer().
query(MQ, #{limit := Limit} = PagerParams) ->
Pos = maps:get(position, PagerParams, none),
PQsList = ?PQUEUE:to_queues_list(MQ#mqueue.q),
{Msgs, NxtPos} = sublist(skip_until(PQsList, Pos), Limit, [], Pos),
{Msgs, #{position => NxtPos, start => first_msg_pos(PQsList)}}.
skip_until(MQ, none = _MsgId) ->
MQ;
skip_until(MQ, MsgId) ->
do_skip_until(MQ, MsgId).
do_skip_until(MQ, MsgId) ->
case out(MQ) of
{empty, MQ} ->
MQ;
{{value, #message{id = MsgId}}, Q1} ->
Q1;
{{value, _Msg}, Q1} ->
do_skip_until(Q1, MsgId)
first_msg_pos([]) ->
none;
first_msg_pos([{Prio, PQ} | T]) ->
case ?PQUEUE:out(PQ) of
{empty, _PQ} ->
first_msg_pos(T);
{{value, Msg}, _Q} ->
{insert_ts(Msg), Prio}
end.
sublist(_MQ, 0) ->
{[], none};
sublist(MQ, Len) ->
{ListAcc, HasNext} = sublist(MQ, Len, []),
{lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
sublist(MQ, 0, Acc) ->
{Acc, element(1, out(MQ)) =/= empty};
sublist(MQ, Len, Acc) ->
case out(MQ) of
{empty, _MQ} ->
{Acc, false};
{{value, Msg}, Q1} ->
sublist(Q1, Len - 1, [Msg | Acc])
skip_until(PQsList, none = _Pos) ->
PQsList;
skip_until(PQsList, {MsgPos, PrioPos}) ->
case skip_until_prio(PQsList, PrioPos) of
[{Prio, PQ} | T] ->
PQ1 = skip_until_msg(PQ, MsgPos),
[{Prio, PQ1} | T];
[] ->
[]
end.
next_cont(_Acc, false) ->
end_of_data;
next_cont([#message{id = Id} | _Acc], _HasNext) ->
Id;
next_cont([], _HasNext) ->
end_of_data.
skip_until_prio(PQsList, PrioPos) ->
lists:dropwhile(fun({Prio, _PQ}) -> Prio > PrioPos end, PQsList).
skip_until_msg(PQ, MsgPos) ->
case ?PQUEUE:out(PQ) of
{empty, PQ1} ->
PQ1;
{{value, Msg}, PQ1} ->
case insert_ts(Msg) > MsgPos of
true -> PQ;
false -> skip_until_msg(PQ1, MsgPos)
end
end.
sublist(PQs, Len, Acc, LastPosPrio) when PQs =:= []; Len =:= 0 ->
{Acc, LastPosPrio};
sublist([{Prio, PQ} | T], Len, Acc, LastPosPrio) ->
{SingleQAcc, SingleQSize} = sublist_single_pq(Prio, PQ, Len, [], 0),
Acc1 = Acc ++ lists:reverse(SingleQAcc),
NxtPosPrio =
case SingleQAcc of
[H | _] -> {insert_ts(H), Prio};
[] -> LastPosPrio
end,
case SingleQSize =:= Len of
true ->
{Acc1, NxtPosPrio};
false ->
sublist(T, Len - SingleQSize, Acc1, NxtPosPrio)
end.
sublist_single_pq(_Prio, _PQ, 0, Acc, AccSize) ->
{Acc, AccSize};
sublist_single_pq(Prio, PQ, Len, Acc, AccSize) ->
case ?PQUEUE:out(0, PQ) of
{empty, _PQ} ->
{Acc, AccSize};
{{value, Msg}, PQ1} ->
Msg1 = with_prio(Msg, Prio),
sublist_single_pq(Prio, PQ1, Len - 1, [Msg1 | Acc], AccSize + 1)
end.
with_prio(#message{extra = Extra} = Msg, Prio) ->
Msg#message{extra = Extra#{mqueue_priority => Prio}}.
to_list(MQ, Acc) ->
case out(MQ) of
@ -256,14 +285,15 @@ in(
) ->
Priority = get_priority(Topic, PTab, Dp),
PLen = ?PQUEUE:plen(Priority, Q),
Msg1 = with_ts(Msg),
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
true ->
%% reached max length, drop the oldest message
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
Q2 = ?PQUEUE:in(Msg1, Priority, Q1),
{without_ts(DroppedMsg), MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg1, Priority, Q)}}
end.
-spec out(mqueue()) -> {empty | {value, message()}, mqueue()}.
@ -280,7 +310,7 @@ out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts
last_prio = Prio,
p_credit = get_credits(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
{{value, without_ts(Val)}, MQ1};
out(MQ = #mqueue{q = Q, p_credit = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
@ -288,8 +318,12 @@ out(MQ = #mqueue{q = Q, p_credit = 0}) ->
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
{R, Q2} =
case ?PQUEUE:out(Q) of
{{value, Val}, Q1} -> {{value, without_ts(Val)}, Q1};
Other -> Other
end,
{R, MQ#mqueue{q = Q2, len = Len - 1, p_credit = Cnt - 1}}.
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
@ -359,3 +393,23 @@ p_table(PTab = #{}) ->
);
p_table(PTab) ->
PTab.
%% This is used to sort/traverse messages in query/2
with_ts(#message{extra = Extra} = Msg) ->
TsNano = erlang:system_time(nanosecond),
Extra1 =
case is_map(Extra) of
true -> Extra;
%% extra field has not being used before EMQX 5.4.0
%% and defaulted to an empty list,
%% if it's not a map it's safe to overwrite it
false -> #{}
end,
Msg#message{extra = Extra1#{?INSERT_TS => TsNano}}.
without_ts(#message{extra = Extra} = Msg) ->
Msg#message{extra = maps:remove(?INSERT_TS, Extra)};
without_ts(Msg) ->
Msg.
insert_ts(#message{extra = #{?INSERT_TS := Ts}}) -> Ts.

View File

@ -46,6 +46,7 @@
len/1,
plen/2,
to_list/1,
to_queues_list/1,
from_list/1,
in/2,
in/3,
@ -121,6 +122,18 @@ to_list({pqueue, Queues}) ->
{0, V} <- to_list(Q)
].
-spec to_queues_list(pqueue()) -> [{priority(), squeue()}].
to_queues_list({queue, _In, _Out, _Len} = Squeue) ->
[{0, Squeue}];
to_queues_list({pqueue, Queues}) ->
lists:sort(
fun
({infinity = _P1, _}, {_P2, _}) -> true;
({P1, _}, {P2, _}) -> P1 >= P2
end,
[{maybe_negate_priority(P), Q} || {P, Q} <- Queues]
).
-spec from_list([{priority(), any()}]) -> pqueue().
from_list(L) ->
lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L).

View File

@ -146,6 +146,8 @@
-define(DEFAULT_BATCH_N, 1000).
-define(INFLIGHT_INSERT_TS, inflight_insert_ts).
%%--------------------------------------------------------------------
%% Init a Session
%%--------------------------------------------------------------------
@ -269,8 +271,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
{InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
{[I#inflight_data.message || {_, I} <- InflightList], Meta};
inflight_query(Inflight, PagerParams);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
@ -396,7 +397,7 @@ puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session1 = Session#session{inflight = Inflight1},
{ok, Replies, Session2} = dequeue(ClientInfo, Session1),
{ok, Msg, Replies, Session2};
{ok, without_inflight_insert_ts(Msg), Replies, Session2};
{value, _} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -415,7 +416,7 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) ->
{value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
Update = Data#inflight_data{phase = wait_comp},
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{ok, without_inflight_insert_ts(Msg), Session#session{inflight = Inflight1}};
{value, _} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -451,7 +452,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session1 = Session#session{inflight = Inflight1},
{ok, Replies, Session2} = dequeue(ClientInfo, Session1),
{ok, Msg, Replies, Session2};
{ok, without_inflight_insert_ts(Msg), Replies, Session2};
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -636,7 +637,7 @@ do_retry_delivery(
_ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
{Acc, emqx_inflight:delete(PacketId, Inflight)};
false ->
Msg1 = emqx_message:set_flag(dup, true, Msg),
Msg1 = without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg)),
Update = Data#inflight_data{message = Msg1, timestamp = Now},
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{[{PacketId, Msg1} | Acc], Inflight1}
@ -728,7 +729,7 @@ replay(ClientInfo, Session) ->
({PacketId, #inflight_data{phase = wait_comp}}) ->
{pubrel, PacketId};
({PacketId, #inflight_data{message = Msg}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
{PacketId, without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg))}
end,
emqx_inflight:to_list(Session#session.inflight)
),
@ -775,7 +776,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
{true, without_inflight_insert_ts(Msg)};
({_PacketId, #inflight_data{}}) ->
false
end,
@ -798,22 +799,83 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
%% Helper functions
%%--------------------------------------------------------------------
-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
-compile(
{inline, [
sort_fun/2, batch_n/1, inflight_insert_ts/1, without_inflight_insert_ts/1, with_ts/1, age/2
]}
).
sort_fun({_, A}, {_, B}) ->
A#inflight_data.timestamp =< B#inflight_data.timestamp.
query_sort_fun({_, #inflight_data{message = A}}, {_, #inflight_data{message = B}}) ->
inflight_insert_ts(A) =< inflight_insert_ts(B).
-spec inflight_query(emqx_inflight:inflight(), #{
position => integer() | none, limit := pos_integer()
}) ->
{[emqx_types:message()], #{position := integer() | none, start := integer() | none}}.
inflight_query(Inflight, #{limit := Limit} = PagerParams) ->
InflightL = emqx_inflight:to_list(fun query_sort_fun/2, Inflight),
StartPos =
case InflightL of
[{_, #inflight_data{message = FirstM}} | _] -> inflight_insert_ts(FirstM);
[] -> none
end,
Position = maps:get(position, PagerParams, none),
InflightMsgs = sublist_from_pos(InflightL, Position, Limit),
NextPos =
case InflightMsgs of
[_ | _] = L ->
inflight_insert_ts(lists:last(L));
[] ->
Position
end,
{InflightMsgs, #{start => StartPos, position => NextPos}}.
sublist_from_pos(InflightList, none = _Position, Limit) ->
inflight_msgs_sublist(InflightList, Limit);
sublist_from_pos(InflightList, Position, Limit) ->
Inflight = lists:dropwhile(
fun({_, #inflight_data{message = M}}) ->
inflight_insert_ts(M) =< Position
end,
InflightList
),
inflight_msgs_sublist(Inflight, Limit).
%% Small optimization to get sublist and drop keys in one traversal
inflight_msgs_sublist([{_Key, #inflight_data{message = Msg}} | T], Limit) when Limit > 0 ->
[Msg | inflight_msgs_sublist(T, Limit - 1)];
inflight_msgs_sublist(_, _) ->
[].
inflight_insert_ts(#message{extra = #{?INFLIGHT_INSERT_TS := Ts}}) -> Ts.
without_inflight_insert_ts(#message{extra = Extra} = Msg) ->
Msg#message{extra = maps:remove(?INFLIGHT_INSERT_TS, Extra)}.
batch_n(Inflight) ->
case emqx_inflight:max_size(Inflight) of
0 -> ?DEFAULT_BATCH_N;
Sz -> Sz - emqx_inflight:size(Inflight)
end.
with_ts(Msg) ->
with_ts(#message{extra = Extra} = Msg) ->
InsertTsNano = erlang:system_time(nanosecond),
%% This is used to sort/traverse messages in inflight_query/2
Extra1 =
case is_map(Extra) of
true -> Extra;
%% extra field has not being used before EMQX 5.4.0 and defaulted to an empty list,
%% if it's not a map it's safe to overwrite it
false -> #{}
end,
Msg1 = Msg#message{extra = Extra1#{?INFLIGHT_INSERT_TS => InsertTsNano}},
#inflight_data{
phase = wait_ack,
message = Msg,
timestamp = erlang:system_time(millisecond)
message = Msg1,
timestamp = erlang:convert_time_unit(InsertTsNano, nanosecond, millisecond)
}.
age(Now, Ts) -> Now - Ts.

View File

@ -126,73 +126,3 @@ t_to_list(_) ->
),
ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).
t_query(_) ->
EmptyInflight = emqx_inflight:new(500),
?assertMatch(
{[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50})
),
Inflight = lists:foldl(
fun(Seq, QAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
end,
EmptyInflight,
lists:reverse(lists:seq(1, 114))
),
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
Limit = 10,
PagerParams = #{continuation => Cont, limit => Limit},
{Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
?assertMatch(
#{count := 114, continuation := IntCont} when is_integer(IntCont),
Meta
),
NextCont
end,
none,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
continuation => LastCont, limit => 10
}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10})
),
{LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual({1, <<"1">>}, hd(LargePage)),
?assertEqual({114, <<"114">>}, lists:last(LargePage)),
?assertMatch(#{continuation := end_of_data}, LargeMeta),
{FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual({1, <<"1">>}, hd(FullPage)),
?assertEqual({114, <<"114">>}, lists:last(FullPage)),
?assertMatch(#{continuation := end_of_data}, FullMeta),
{EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{continuation := none, count := 114}, EmptyMeta).

View File

@ -284,13 +284,15 @@ t_dropped(_) ->
t_query(_) ->
EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
?assertMatch(
{[], #{continuation := end_of_data}},
?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50})
?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
RandPos = {erlang:system_time(nanosecond), 0},
?assertEqual(
{[], #{position => RandPos, start => none}},
?Q:query(EmptyQ, #{position => RandPos, limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50})
?assertEqual(
{[], #{position => none, start => none}},
?Q:query(EmptyQ, #{continuation => none, limit => 50})
),
Q = lists:foldl(
@ -303,52 +305,146 @@ t_query(_) ->
lists:seq(1, 114)
),
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
{LastPos, LastStart} = lists:foldl(
fun(PageSeq, {Pos, PrevStart}) ->
Limit = 10,
PagerParams = #{continuation => Cont, limit => Limit},
{Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams),
PagerParams = #{position => Pos, limit => Limit},
{Page, #{position := NextPos, start := Start}} = ?Q:query(Q, PagerParams),
?assertEqual(10, length(Page)),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(
ExpFirstPayload,
emqx_message:payload(lists:nth(1, Page)),
#{page_seq => PageSeq, page => Page, meta => Meta}
),
?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta),
NextCont
FirstMsg = lists:nth(1, Page),
LastMsg = lists:nth(10, Page),
?assertEqual(ExpFirstPayload, emqx_message:payload(FirstMsg)),
?assertEqual(ExpLastPayload, emqx_message:payload(LastMsg)),
%% start value must not change as Mqueue is not modified during traversal
NextStart =
case PageSeq of
1 ->
?assertEqual({mqueue_ts(FirstMsg), 0}, Start),
Start;
_ ->
?assertEqual(PrevStart, Start),
PrevStart
end,
none,
{NextPos, NextStart}
end,
{none, none},
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}),
{LastPartialPage, #{position := FinalPos} = LastMeta} = ?Q:query(Q, #{
position => LastPos, limit => 10
}),
LastMsg = lists:nth(4, LastPartialPage),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
?assertMatch(
{[], #{continuation := end_of_data}},
?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10})
?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
?assertEqual(#{position => {mqueue_ts(LastMsg), 0}, start => LastStart}, LastMeta),
?assertEqual(
{[], #{start => LastStart, position => FinalPos}},
?Q:query(Q, #{position => FinalPos, limit => 10})
),
{LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
{LargePage, LargeMeta} = ?Q:query(Q, #{position => none, limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertMatch(#{continuation := end_of_data}, LargeMeta),
?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
{FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
?assertMatch(#{continuation := end_of_data}, FullMeta),
{FullPage, FullMeta} = ?Q:query(Q, #{position => none, limit => 114}),
?assertEqual(LargePage, FullPage),
?assertEqual(LargeMeta, FullMeta),
{EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
{_, Q1} = emqx_mqueue:out(Q),
{PageAfterRemove, #{start := StartAfterRemove}} = ?Q:query(Q1, #{position => none, limit => 10}),
?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
?assertEqual(StartAfterRemove, {mqueue_ts(hd(PageAfterRemove)), 0}).
t_query_with_priorities(_) ->
Priorities = #{<<"t/infinity">> => infinity, <<"t/10">> => 10, <<"t/5">> => 5},
EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true, priorities => Priorities}),
?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
RandPos = {erlang:system_time(nanosecond), 0},
?assertEqual(
{[], #{position => RandPos, start => none}},
?Q:query(EmptyQ, #{position => RandPos, limit => 50})
),
?assertEqual(
{[], #{position => none, start => none}},
?Q:query(EmptyQ, #{continuation => none, limit => 50})
),
{Q, ExpMsgsAcc} = lists:foldl(
fun(Topic, {QAcc, MsgsAcc}) ->
{TopicQ, TopicMsgs} =
lists:foldl(
fun(Seq, {TopicQAcc, TopicMsgsAcc}) ->
Payload = <<Topic/binary, "_", (integer_to_binary(Seq))/binary>>,
Msg = emqx_message:make(Topic, Payload),
{_, TopicQAcc1} = ?Q:in(Msg, TopicQAcc),
{TopicQAcc1, [Msg | TopicMsgsAcc]}
end,
{QAcc, []},
lists:seq(1, 10)
),
{TopicQ, [lists:reverse(TopicMsgs) | MsgsAcc]}
end,
{EmptyQ, []},
[<<"t/test">>, <<"t/5">>, <<"t/infinity">>, <<"t/10">>]
),
%% Manual resorting from the highest to the lowest priority
[ExpMsgsPrio0, ExpMsgsPrio5, ExpMsgsPrioInf, ExpMsgsPrio10] = lists:reverse(ExpMsgsAcc),
ExpMsgs = ExpMsgsPrioInf ++ ExpMsgsPrio10 ++ ExpMsgsPrio5 ++ ExpMsgsPrio0,
{AllMsgs, #{start := StartPos, position := Pos}} = ?Q:query(Q, #{position => none, limit => 40}),
?assertEqual(40, length(AllMsgs)),
?assertEqual(ExpMsgs, with_empty_extra(AllMsgs)),
FirstMsg = hd(AllMsgs),
LastMsg = lists:last(AllMsgs),
?assertEqual(<<"t/infinity_1">>, emqx_message:payload(FirstMsg)),
?assertEqual(StartPos, {mqueue_ts(FirstMsg), infinity}),
?assertEqual(<<"t/test_10">>, emqx_message:payload(LastMsg)),
?assertMatch({_, 0}, Pos),
?assertEqual(Pos, {mqueue_ts(LastMsg), mqueue_prio(LastMsg)}),
Pos5 = {mqueue_ts(lists:nth(5, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
LastInfPos = {mqueue_ts(lists:nth(10, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
{MsgsPrioInfTo10, #{start := StartPos, position := PosPrio10Msg5}} = ?Q:query(Q, #{
position => Pos5, limit => 10
}),
?assertEqual(10, length(MsgsPrioInfTo10)),
?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo10))),
?assertEqual(<<"t/10_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo10))),
?assertEqual(PosPrio10Msg5, {
mqueue_ts(lists:last(MsgsPrioInfTo10)), mqueue_prio(lists:last(MsgsPrioInfTo10))
}),
{MsgsPrioInfTo5, #{start := StartPos, position := PosPrio5Msg5}} = ?Q:query(Q, #{
position => Pos5, limit => 20
}),
?assertEqual(20, length(MsgsPrioInfTo5)),
?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo5))),
?assertEqual(<<"t/5_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo5))),
?assertEqual(PosPrio5Msg5, {
mqueue_ts(lists:last(MsgsPrioInfTo5)), mqueue_prio(lists:last(MsgsPrioInfTo5))
}),
{MsgsPrio10, #{start := StartPos, position := PosPrio10}} = ?Q:query(Q, #{
position => LastInfPos, limit => 10
}),
?assertEqual(ExpMsgsPrio10, with_empty_extra(MsgsPrio10)),
?assertEqual(10, length(MsgsPrio10)),
?assertEqual(<<"t/10_1">>, emqx_message:payload(hd(MsgsPrio10))),
?assertEqual(<<"t/10_10">>, emqx_message:payload(lists:last(MsgsPrio10))),
?assertEqual(PosPrio10, {mqueue_ts(lists:last(MsgsPrio10)), mqueue_prio(lists:last(MsgsPrio10))}),
{MsgsPrio10To5, #{start := StartPos, position := _}} = ?Q:query(Q, #{
position => LastInfPos, limit => 20
}),
?assertEqual(ExpMsgsPrio10 ++ ExpMsgsPrio5, with_empty_extra(MsgsPrio10To5)).
conservation_prop() ->
?FORALL(
@ -413,3 +509,9 @@ drain(Q) ->
{{value, #message{topic = T, payload = P}}, Q1} ->
[{T, P} | drain(Q1)]
end.
mqueue_ts(#message{extra = #{mqueue_insert_ts := Ts}}) -> Ts.
mqueue_prio(#message{extra = #{mqueue_priority := Prio}}) -> Prio.
with_empty_extra(Msgs) ->
[M#message{extra = #{}} || M <- Msgs].

View File

@ -19,6 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -115,6 +116,80 @@ t_session_stats(_) ->
maps:from_list(Stats)
).
t_session_inflight_query(_) ->
EmptyInflight = emqx_inflight:new(500),
Session = session(#{inflight => EmptyInflight}),
EmptyQueryResMeta = {[], #{position => none, start => none}},
?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
RandPos = erlang:system_time(nanosecond),
?assertEqual({[], #{position => RandPos, start => none}}, inflight_query(Session, RandPos, 10)),
Inflight = lists:foldl(
fun(Seq, Acc) ->
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, integer_to_binary(Seq)),
emqx_inflight:insert(Seq, emqx_session_mem:with_ts(Msg), Acc)
end,
EmptyInflight,
lists:seq(1, 114)
),
Session1 = session(#{inflight => Inflight}),
{LastPos, LastStart} = lists:foldl(
fun(PageSeq, {Pos, PrevStart}) ->
Limit = 10,
{Page, #{position := NextPos, start := Start}} = inflight_query(Session1, Pos, Limit),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
FirstMsg = lists:nth(1, Page),
LastMsg = lists:nth(10, Page),
?assertEqual(integer_to_binary(ExpFirst), emqx_message:payload(FirstMsg)),
?assertEqual(integer_to_binary(ExpLast), emqx_message:payload(LastMsg)),
%% start value must not change as Inflight is not modified during traversal
NextStart =
case PageSeq of
1 ->
?assertEqual(inflight_ts(FirstMsg), Start),
Start;
_ ->
?assertEqual(PrevStart, Start),
PrevStart
end,
?assertEqual(inflight_ts(LastMsg), NextPos),
{NextPos, NextStart}
end,
{none, none},
lists:seq(1, 11)
),
{LastPartialPage, #{position := FinalPos} = LastMeta} = inflight_query(
Session1, LastPos, 10
),
LastMsg = lists:nth(4, LastPartialPage),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
?assertEqual(#{position => inflight_ts(LastMsg), start => LastStart}, LastMeta),
?assertEqual(
{[], #{start => LastStart, position => FinalPos}},
inflight_query(Session1, FinalPos, 10)
),
{LargePage, LargeMeta} = inflight_query(Session1, none, 1000),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
{FullPage, FullMeta} = inflight_query(Session1, none, 114),
?assertEqual(LargePage, FullPage),
?assertEqual(LargeMeta, FullMeta),
Session2 = session(#{inflight => emqx_inflight:delete(1, Inflight)}),
{PageAfterRemove, #{start := StartAfterRemove}} = inflight_query(Session2, none, 10),
?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
?assertEqual(StartAfterRemove, inflight_ts(hd(PageAfterRemove))).
%%--------------------------------------------------------------------
%% Test cases for sub/unsub
%%--------------------------------------------------------------------
@ -274,9 +349,10 @@ t_pubrel_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, session()).
t_pubcomp(_) ->
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, Msg), emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, undefined, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
{ok, Msg, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)).
t_pubcomp_error_packetid_in_use(_) ->
@ -598,3 +674,8 @@ set_duplicate_pub({Id, Msg}) ->
get_packet_id({Id, _}) ->
Id.
inflight_query(Session, Pos, Limit) ->
emqx_session_mem:info({inflight_msgs, #{position => Pos, limit => Limit}}, Session).
inflight_ts(#message{extra = #{inflight_insert_ts := Ts}}) -> Ts.

View File

@ -178,36 +178,27 @@ fields(hasnext) ->
>>,
Meta = #{desc => Desc, required => true},
[{hasnext, hoconsc:mk(boolean(), Meta)}];
fields('after') ->
Desc = <<
"The value of \"last\" field returned in the previous response. It can then be used"
" in subsequent requests to get the next chunk of results.<br/>"
"It is used instead of \"page\" parameter to traverse volatile data.<br/>"
"Can be omitted or set to \"none\" to get the first chunk of data.<br/>"
"\last\" = end_of_data\" is returned, if there is no more data.<br/>"
"Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
" error response."
>>,
Meta = #{
in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
},
[{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(last) ->
fields(position) ->
Desc = <<
"An opaque token that can then be in subsequent requests to get "
" the next chunk of results: \"?after={last}\"<br/>"
"if there is no more data, \"last\" = end_of_data\" is returned.<br/>"
"Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
" error response."
" the next chunk of results: \"?position={prev_response.meta.position}\"<br/>"
"It is used instead of \"page\" parameter to traverse highly volatile data.<br/>"
"Can be omitted or set to \"none\" to get the first chunk of data."
>>,
Meta = #{
desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">>
in => query, desc => Desc, required => false, example => <<"none">>
},
[{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
[{position, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(start) ->
Desc = <<"The position of the current first element of the data collection.">>,
Meta = #{
desc => Desc, required => true, example => <<"none">>
},
[{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
fields(continuation_meta) ->
fields(last) ++ fields(count).
fields(start) ++ fields(position).
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
schema_with_example(Type, Example) ->

View File

@ -15,6 +15,3 @@
%%--------------------------------------------------------------------
-define(DEFAULT_ROW_LIMIT, 100).
-define(URL_PARAM_INTEGER, url_param_integer).
-define(URL_PARAM_BINARY, url_param_binary).

View File

@ -39,7 +39,6 @@
-export([
parse_pager_params/1,
parse_cont_pager_params/2,
encode_cont_pager_params/2,
parse_qstring/2,
init_query_result/0,
init_query_state/5,
@ -138,32 +137,18 @@ page(Params) ->
limit(Params) when is_map(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
continuation(Params, Encoding) ->
position(Params, Decoder) ->
try
decode_continuation(maps:get(<<"after">>, Params, none), Encoding)
decode_position(maps:get(<<"position">>, Params, none), Decoder)
catch
_:_ ->
error
end.
decode_continuation(none, _Encoding) ->
decode_position(none, _Decoder) ->
none;
decode_continuation(end_of_data, _Encoding) ->
%% Clients should not send "after=end_of_data" back to the server
error;
decode_continuation(Cont, ?URL_PARAM_INTEGER) ->
binary_to_integer(Cont);
decode_continuation(Cont, ?URL_PARAM_BINARY) ->
emqx_utils:hexstr_to_bin(Cont).
encode_continuation(none, _Encoding) ->
none;
encode_continuation(end_of_data, _Encoding) ->
end_of_data;
encode_continuation(Cont, ?URL_PARAM_INTEGER) ->
integer_to_binary(Cont);
encode_continuation(Cont, ?URL_PARAM_BINARY) ->
emqx_utils:bin_to_hexstr(Cont, lower).
decode_position(Pos, Decoder) ->
Decoder(Pos).
%%--------------------------------------------------------------------
%% Node Query
@ -670,25 +655,18 @@ parse_pager_params(Params) ->
false
end.
-spec parse_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) ->
#{limit := pos_integer(), continuation := none | end_of_table | binary()} | false.
parse_cont_pager_params(Params, Encoding) ->
Cont = continuation(Params, Encoding),
-spec parse_cont_pager_params(map(), fun((binary()) -> term())) ->
#{limit := pos_integer(), position := none | term()} | false.
parse_cont_pager_params(Params, PositionDecoder) ->
Pos = position(Params, PositionDecoder),
Limit = b2i(limit(Params)),
case Limit > 0 andalso Cont =/= error of
case Limit > 0 andalso Pos =/= error of
true ->
#{continuation => Cont, limit => Limit};
#{position => Pos, limit => Limit};
false ->
false
end.
-spec encode_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) -> map().
encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) ->
Meta1 = maps:remove(continuation, Meta),
Meta1#{last => encode_continuation(Cont, ContEncoding)};
encode_cont_pager_params(Meta, _ContEncoding) ->
Meta.
%%--------------------------------------------------------------------
%% Types
%%--------------------------------------------------------------------

View File

@ -413,11 +413,11 @@ schema("/clients/:clientid/keepalive") ->
}
};
schema("/clients/:clientid/mqueue_messages") ->
ContExample = <<"AAYS53qRa0n07AAABFIACg">>,
ContExample = <<"1710785444656449826_10">>,
RespSchema = ?R_REF(mqueue_messages),
client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema);
schema("/clients/:clientid/inflight_messages") ->
ContExample = <<"10">>,
ContExample = <<"1710785444656449826">>,
RespSchema = ?R_REF(inflight_messages),
client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema);
schema("/sessions_count") ->
@ -656,7 +656,7 @@ fields(unsubscribe) ->
];
fields(mqueue_messages) ->
[
{data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})},
{data, hoconsc:mk(hoconsc:array(?REF(mqueue_message)), #{desc => ?DESC(mqueue_msgs_list)})},
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
];
fields(inflight_messages) ->
@ -672,7 +672,17 @@ fields(message) ->
{publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})},
{from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
{from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})},
{inserted_at, hoconsc:mk(binary(), #{desc => ?DESC(msg_inserted_at)})}
];
fields(mqueue_message) ->
fields(message) ++
[
{mqueue_priority,
hoconsc:mk(
hoconsc:union([integer(), infinity]),
#{desc => ?DESC(msg_mqueue_priority)}
)}
];
fields(requested_client_fields) ->
%% NOTE: some Client fields actually returned in response are missing in schema:
@ -920,7 +930,7 @@ client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
responses => #{
200 =>
emqx_dashboard_swagger:schema_with_example(RespSchema, #{
<<"data">> => [message_example()],
<<"data">> => [message_example(OpId)],
<<"meta">> => #{
<<"count">> => 100,
<<"last">> => ContExample
@ -963,7 +973,7 @@ client_msgs_params() ->
>>,
validator => fun max_bytes_validator/1
})},
hoconsc:ref(emqx_dashboard_swagger, 'after'),
hoconsc:ref(emqx_dashboard_swagger, position),
hoconsc:ref(emqx_dashboard_swagger, limit)
].
@ -1200,9 +1210,9 @@ is_live_session(ClientId) ->
[] =/= emqx_cm_registry:lookup_channels(ClientId).
list_client_msgs(MsgType, ClientID, QString) ->
case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
case emqx_mgmt_api:parse_cont_pager_params(QString, pos_decoder(MsgType)) of
false ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}};
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"position_limit_invalid">>}};
PagerParams = #{} ->
case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of
{error, not_found} ->
@ -1212,10 +1222,34 @@ list_client_msgs(MsgType, ClientID, QString) ->
end
end.
%% integer packet id
cont_encoding(inflight_msgs) -> ?URL_PARAM_INTEGER;
%% binary message id
cont_encoding(mqueue_msgs) -> ?URL_PARAM_BINARY.
pos_decoder(mqueue_msgs) -> fun decode_mqueue_pos/1;
pos_decoder(inflight_msgs) -> fun decode_msg_pos/1.
encode_msgs_meta(_MsgType, #{start := StartPos, position := Pos}) ->
#{start => encode_pos(StartPos), position => encode_pos(Pos)}.
encode_pos(none) ->
none;
encode_pos({MsgPos, PrioPos}) ->
MsgPosBin = integer_to_binary(MsgPos),
PrioPosBin =
case PrioPos of
infinity -> <<"infinity">>;
_ -> integer_to_binary(PrioPos)
end,
<<MsgPosBin/binary, "_", PrioPosBin/binary>>;
encode_pos(Pos) when is_integer(Pos) ->
integer_to_binary(Pos).
-spec decode_mqueue_pos(binary()) -> {integer(), infinity | integer()}.
decode_mqueue_pos(Pos) ->
[MsgPos, PrioPos] = binary:split(Pos, <<"_">>),
{decode_msg_pos(MsgPos), decode_priority_pos(PrioPos)}.
decode_msg_pos(Pos) -> binary_to_integer(Pos).
decode_priority_pos(<<"infinity">>) -> infinity;
decode_priority_pos(Pos) -> binary_to_integer(Pos).
max_bytes_validator(MaxBytes) when is_integer(MaxBytes), MaxBytes > 0 ->
ok;
@ -1415,8 +1449,8 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
<<"payload">> := PayloadFmt,
<<"max_payload_bytes">> := MaxBytes
} = QString,
Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)),
Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)},
Meta1 = encode_msgs_meta(MsgType, Meta),
Resp = #{meta => Meta1, data => format_msgs(MsgType, Msgs, PayloadFmt, MaxBytes)},
%% Make sure minirest won't set another content-type for self-encoded JSON response body
Headers = #{<<"content-type">> => <<"application/json">>},
case emqx_utils_json:safe_encode(Resp) of
@ -1432,13 +1466,13 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
?INTERNAL_ERROR(Error)
end.
format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
format_msgs(MsgType, [FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
%% Always include at least one message payload, even if it exceeds the limit
{FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt),
{FirstMsg1, PayloadSize0} = format_msg(MsgType, FirstMsg, PayloadFmt),
{Msgs1, _} =
catch lists:foldl(
fun(Msg, {MsgsAcc, SizeAcc} = Acc) ->
{Msg1, PayloadSize} = format_msg(Msg, PayloadFmt),
{Msg1, PayloadSize} = format_msg(MsgType, Msg, PayloadFmt),
case SizeAcc + PayloadSize of
SizeAcc1 when SizeAcc1 =< MaxBytes ->
{[Msg1 | MsgsAcc], SizeAcc1};
@ -1450,10 +1484,11 @@ format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
Msgs
),
lists:reverse(Msgs1);
format_msgs([], _PayloadFmt, _MaxBytes) ->
format_msgs(_MsgType, [], _PayloadFmt, _MaxBytes) ->
[].
format_msg(
MsgType,
#message{
id = ID,
qos = Qos,
@ -1462,10 +1497,10 @@ format_msg(
timestamp = Timestamp,
headers = Headers,
payload = Payload
},
} = Msg,
PayloadFmt
) ->
Msg = #{
MsgMap = #{
msgid => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
@ -1473,15 +1508,23 @@ format_msg(
from_clientid => emqx_utils_conv:bin(From),
from_username => maps:get(username, Headers, <<>>)
},
format_payload(PayloadFmt, Msg, Payload).
MsgMap1 = format_by_msg_type(MsgType, Msg, MsgMap),
format_payload(PayloadFmt, MsgMap1, Payload).
format_payload(none, Msg, _Payload) ->
{Msg, 0};
format_payload(base64, Msg, Payload) ->
format_by_msg_type(mqueue_msgs, Msg, MsgMap) ->
#message{extra = #{mqueue_priority := Prio, mqueue_insert_ts := Ts}} = Msg,
MsgMap#{mqueue_priority => Prio, inserted_at => integer_to_binary(Ts)};
format_by_msg_type(inflight_msgs, Msg, MsgMap) ->
#message{extra = #{inflight_insert_ts := Ts}} = Msg,
MsgMap#{inserted_at => integer_to_binary(Ts)}.
format_payload(none, MsgMap, _Payload) ->
{MsgMap, 0};
format_payload(base64, MsgMap, Payload) ->
Payload1 = base64:encode(Payload),
{Msg#{payload => Payload1}, erlang:byte_size(Payload1)};
format_payload(plain, Msg, Payload) ->
{Msg#{payload => Payload}, erlang:iolist_size(Payload)}.
{MsgMap#{payload => Payload1}, erlang:byte_size(Payload1)};
format_payload(plain, MsgMap, Payload) ->
{MsgMap#{payload => Payload}, erlang:iolist_size(Payload)}.
%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
@ -1584,6 +1627,11 @@ client_example() ->
<<"recv_msg.qos0">> => 0
}.
message_example(inflight_msgs) ->
message_example();
message_example(mqueue_msgs) ->
(message_example())#{<<"mqueue_priority">> => 0}.
message_example() ->
#{
<<"msgid">> => <<"000611F460D57FA9F44500000D360002">>,

View File

@ -1070,18 +1070,19 @@ t_mqueue_messages(Config) ->
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
IsMqueue = true,
test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=10&after=not-base64%23%21", AuthHeader
get, Path, "limit=10&position=not-valid", AuthHeader
)
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
get, Path, "limit=-5&position=not-valid", AuthHeader
)
).
@ -1093,18 +1094,21 @@ t_inflight_messages(Config) ->
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
InflightLimit = emqx:get_config([mqtt, max_inflight]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
IsMqueue = false,
test_messages(
Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=10&after=not-int", AuthHeader
get, Path, "limit=10&position=not-int", AuthHeader
)
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=-5&after=invalid-int", AuthHeader
get, Path, "limit=-5&position=invalid-int", AuthHeader
)
),
emqtt:stop(Client).
@ -1142,19 +1146,16 @@ publish_msgs(Topic, Count) ->
lists:seq(1, Count)
).
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
?assertMatch(
#{
<<"last">> := <<"end_of_data">>,
<<"count">> := Count
},
Meta
),
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
?assertEqual(length(Msgs), Count),
lists:foreach(
fun({Seq, #{<<"payload">> := P} = M}) ->
?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
@ -1165,10 +1166,12 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
<<"qos">> := _,
<<"publish_at">> := _,
<<"from_clientid">> := _,
<<"from_username">> := _
<<"from_username">> := _,
<<"inserted_at">> := _
},
M
)
),
IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M)
end,
lists:zip(lists:seq(1, Count), Msgs)
),
@ -1183,62 +1186,69 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
get, Path, QsPayloadLimit, AuthHeader
),
#{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
ct:pal("~p", [FirstMsgOnly]),
?assertEqual(1, length(FirstMsgOnly)),
?assertEqual(
<<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
),
Limit = 19,
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
{ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
LastPos = lists:foldl(
fun(PageSeq, ThisPos) ->
Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]),
{ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
#{
<<"meta">> := #{<<"last">> := NextCont} = MetaP,
<<"data">> := MsgsP
} = emqx_utils_json:decode(MsgsRespP),
?assertMatch(#{<<"count">> := Count}, MetaP),
?assertNotEqual(<<"end_of_data">>, NextCont),
?assertEqual(length(MsgsP), Limit),
<<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart},
<<"data">> := MsgsPage
} = emqx_utils_json:decode(MsgsRespPage),
?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)),
%% Start position is the same in every response and points to the first msg
?assertEqual(StartPos, ThisStart),
?assertEqual(length(MsgsPage), Limit),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(
ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
ExpFirstPayload,
decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding)
),
?assertEqual(
ExpLastPayload,
decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding)
),
NextCont
NextPos
end,
none,
lists:seq(1, Count div 19)
),
LastPartialPage = Count div 19 + 1,
LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]),
{ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
#{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
#{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode(
MsgsRespLastP
),
?assertEqual(<<"end_of_data">>, EmptyCont),
?assertMatch(#{<<"count">> := Count}, MetaLastP),
%% The same as the position of all messages returned in one request
?assertEqual(Pos, LastPartialPos),
?assertEqual(
integer_to_binary(LastPartialPage * Limit - Limit + 1),
decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding)
),
?assertEqual(
integer_to_binary(Count),
decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding)
),
ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
PayloadEncoding, EmptyCont, Limit
ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [
PayloadEncoding, LastPartialPos, Limit
]),
{ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
#{
<<"data">> := [],
<<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos}
},
emqx_utils_json:decode(MsgsEmptyResp)
),
%% Invalid common page params
@ -1269,6 +1279,11 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
).
msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) ->
<<TsBin/binary, "_", (emqx_utils_conv:bin(Prio))/binary>>;
msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) ->
TsBin.
decode_payload(Payload, base64) ->
base64:decode(Payload);
decode_payload(Payload, _) ->

View File

@ -37,6 +37,9 @@
%% Timestamp (Unit: millisecond)
timestamp :: integer(),
%% Miscellaneous extensions, currently used for OpenTelemetry context propagation
%% and storing mqueue/inflight insertion timestamps.
%% It was not used prior to 5.4.0 and defaulted to an empty list.
%% Must be a map now.
extra = #{} :: term()
}).

View File

@ -1,21 +1,20 @@
Implement HTTP APIs to get the list of client's inflight and mqueue messages.
Implement HTTP APIs to get the list of client's in-flight and mqueue messages.
To get the first chunk of data:
- GET /clients/{clientid}/mqueue_messages?limit=100
- GET /clients/{clientid}/inflight_messages?limit=100
Alternatively:
- GET /clients/{clientid}/mqueue_messages?limit=100&after=none
- GET /clients/{clientid}/inflight_messages?limit=100&after=none
- GET /clients/{clientid}/mqueue_messages?limit=100&position=none
- GET /clients/{clientid}/inflight_messages?limit=100&position=none
To get the next chunk of data:
- GET /clients/{clientid}/mqueue_messages?limit=100&after={last}
- GET /clients/{clientid}/inflight_messages?limit=100&after={last}
- GET /clients/{clientid}/mqueue_messages?limit=100&position={position}
- GET /clients/{clientid}/inflight_messages?limit=100&position={position}
Where {last} is a value (opaque string token) of "meta.last" field from the previous response.
Where {position} is a value (opaque string token) of "meta.position" field from the previous response.
If there is no more data, "last" = "end_of_data" is returned.
If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received.
Mqueue messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority.
By default, all messages in Mqueue have the same priority of 0.
Mqueue messages are ordered according to the queue (FIFO) order.
Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order.
In-flight messages are ordered by time at which they were inserted to the in-flight storage (from older to newer messages).

View File

@ -41,20 +41,22 @@ get_client_mqueue_msgs.label:
"""Get client mqueue messages"""
get_client_inflight_msgs.desc:
"""Get client inflight messages"""
"""Get client in-flight messages"""
get_client_inflight_msgs.label:
"""Get client inflight messages"""
"""Get client in-flight messages"""
mqueue_msgs_list.desc:
"""Client's mqueue messages list. The queue (FIFO) ordering is preserved."""
"""Client's mqueue messages list.
Messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority.
By default, all messages in Mqueue have the same priority of 0."""
mqueue_msgs_list.label:
"""Client's mqueue messages"""
inflight_msgs_list.desc:
"""Client's inflight messages list.
Ordered by MQTT Packet Id, which may not represent the chronological messages order."""
"""Client's in-flight messages list.
Messages are sorted by time at which they were inserted to the In-flight storage (from older to newer messages)."""
inflight_msgs_list.label:
"""Client's inflight messages"""
"""Client's in-flight messages"""
msg_id.desc:
"""Message ID."""
@ -74,7 +76,7 @@ msg_topic.label:
msg_publish_at.desc:
"""Message publish time, a millisecond precision Unix epoch timestamp."""
msg_publish_at.label:
"""Message Publish Time."""
"""Message Publish Time"""
msg_from_clientid.desc:
"""Message publisher's client ID."""
@ -86,6 +88,16 @@ msg_from_username.desc:
msg_from_username.label:
"""Message Publisher's Username"""
msg_inserted_at.desc:
"""A nanosecond precision Unix epoch timestamp at which a message was inserted to In-flight / Mqueue."""
msg_inserted_at.label:
"""Message Insertion Time"""
msg_mqueue_priority.desc:
"""Message Mqueue Priority."""
msg_mqueue_priority.label:
"""Message Mqueue Priority"""
subscribe.desc:
"""Subscribe"""
subscribe.label: