From cb5fdb3c795be9ef9de767a8d3d13d06870c0958 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 18 Mar 2024 21:20:28 +0200 Subject: [PATCH] fix: rework In-flight / Mqueue API - use timestamp 'position' to find the next chunk of data - add 'start' position to response meta - sort In-flight messages by insertion time - sort Mqueue messages by priority --- apps/emqx/src/emqx_inflight.erl | 47 +---- apps/emqx/src/emqx_mqueue.erl | 150 ++++++++++----- apps/emqx/src/emqx_pqueue.erl | 13 ++ apps/emqx/src/emqx_session_mem.erl | 86 +++++++-- apps/emqx/test/emqx_inflight_SUITE.erl | 70 ------- apps/emqx/test/emqx_mqueue_SUITE.erl | 174 ++++++++++++++---- apps/emqx/test/emqx_session_mem_SUITE.erl | 85 ++++++++- .../src/emqx_dashboard_swagger.erl | 35 ++-- apps/emqx_management/include/emqx_mgmt.hrl | 3 - apps/emqx_management/src/emqx_mgmt_api.erl | 44 ++--- .../src/emqx_mgmt_api_clients.erl | 102 +++++++--- .../test/emqx_mgmt_api_clients_SUITE.erl | 95 ++++++---- apps/emqx_utils/include/emqx_message.hrl | 3 + changes/ce/feat-12561.en.md | 19 +- rel/i18n/emqx_mgmt_api_clients.hocon | 28 ++- 15 files changed, 597 insertions(+), 357 deletions(-) diff --git a/apps/emqx/src/emqx_inflight.erl b/apps/emqx/src/emqx_inflight.erl index 1f4433e57..c342a846f 100644 --- a/apps/emqx/src/emqx_inflight.erl +++ b/apps/emqx/src/emqx_inflight.erl @@ -36,8 +36,7 @@ max_size/1, is_full/1, is_empty/1, - window/1, - query/2 + window/1 ]). -export_type([inflight/0]). @@ -139,47 +138,3 @@ size(?INFLIGHT(Tree)) -> -spec max_size(inflight()) -> non_neg_integer(). max_size(?INFLIGHT(MaxSize, _Tree)) -> MaxSize. - --spec query(inflight(), #{continuation => Cont, limit := L}) -> - {[{key(), term()}], #{continuation := Cont, count := C}} -when - Cont :: none | end_of_data | key(), - L :: non_neg_integer(), - C :: non_neg_integer(). -query(?INFLIGHT(Tree), #{limit := Limit} = Pager) -> - Count = gb_trees:size(Tree), - ContKey = maps:get(continuation, Pager, none), - {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit), - {List, #{continuation => NextCont, count => Count}}. - -iterator_from(none, Tree) -> - gb_trees:iterator(Tree); -iterator_from(ContKey, Tree) -> - It = gb_trees:iterator_from(ContKey, Tree), - case gb_trees:next(It) of - {ContKey, _Val, ItNext} -> ItNext; - _ -> It - end. - -sublist(_It, 0) -> - {[], none}; -sublist(It, Len) -> - {ListAcc, HasNext} = sublist(It, Len, []), - {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. - -sublist(It, 0, Acc) -> - {Acc, gb_trees:next(It) =/= none}; -sublist(It, Len, Acc) -> - case gb_trees:next(It) of - none -> - {Acc, false}; - {Key, Val, ItNext} -> - sublist(ItNext, Len - 1, [{Key, Val} | Acc]) - end. - -next_cont(_Acc, false) -> - end_of_data; -next_cont([{LastKey, _LastVal} | _Acc], _HasNext) -> - LastKey; -next_cont([], _HasNext) -> - end_of_data. diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index e3e54cdc9..8b63a8a48 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -98,6 +98,7 @@ -define(HIGHEST_PRIORITY, infinity). -define(MAX_LEN_INFINITY, 0). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]). +-define(INSERT_TS, mqueue_insert_ts). -record(shift_opts, { multiplier :: non_neg_integer(), @@ -172,54 +173,82 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) -> MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff} end. --spec query(mqueue(), #{continuation => ContMsgId, limit := L}) -> - {[message()], #{continuation := ContMsgId, count := C}} +-spec query(mqueue(), #{position => Pos, limit := Limit}) -> + {[message()], #{position := Pos, start := Pos}} when - ContMsgId :: none | end_of_data | binary(), - C :: non_neg_integer(), - L :: non_neg_integer(). -query(MQ, #{limit := Limit} = Pager) -> - ContMsgId = maps:get(continuation, Pager, none), - {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit), - {List, #{continuation => NextCont, count => len(MQ)}}. + Pos :: none | {integer(), priority()}, + Limit :: non_neg_integer(). +query(MQ, #{limit := Limit} = PagerParams) -> + Pos = maps:get(position, PagerParams, none), + PQsList = ?PQUEUE:to_queues_list(MQ#mqueue.q), + {Msgs, NxtPos} = sublist(skip_until(PQsList, Pos), Limit, [], Pos), + {Msgs, #{position => NxtPos, start => first_msg_pos(PQsList)}}. -skip_until(MQ, none = _MsgId) -> - MQ; -skip_until(MQ, MsgId) -> - do_skip_until(MQ, MsgId). - -do_skip_until(MQ, MsgId) -> - case out(MQ) of - {empty, MQ} -> - MQ; - {{value, #message{id = MsgId}}, Q1} -> - Q1; - {{value, _Msg}, Q1} -> - do_skip_until(Q1, MsgId) +first_msg_pos([]) -> + none; +first_msg_pos([{Prio, PQ} | T]) -> + case ?PQUEUE:out(PQ) of + {empty, _PQ} -> + first_msg_pos(T); + {{value, Msg}, _Q} -> + {insert_ts(Msg), Prio} end. -sublist(_MQ, 0) -> - {[], none}; -sublist(MQ, Len) -> - {ListAcc, HasNext} = sublist(MQ, Len, []), - {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. - -sublist(MQ, 0, Acc) -> - {Acc, element(1, out(MQ)) =/= empty}; -sublist(MQ, Len, Acc) -> - case out(MQ) of - {empty, _MQ} -> - {Acc, false}; - {{value, Msg}, Q1} -> - sublist(Q1, Len - 1, [Msg | Acc]) +skip_until(PQsList, none = _Pos) -> + PQsList; +skip_until(PQsList, {MsgPos, PrioPos}) -> + case skip_until_prio(PQsList, PrioPos) of + [{Prio, PQ} | T] -> + PQ1 = skip_until_msg(PQ, MsgPos), + [{Prio, PQ1} | T]; + [] -> + [] end. -next_cont(_Acc, false) -> - end_of_data; -next_cont([#message{id = Id} | _Acc], _HasNext) -> - Id; -next_cont([], _HasNext) -> - end_of_data. +skip_until_prio(PQsList, PrioPos) -> + lists:dropwhile(fun({Prio, _PQ}) -> Prio > PrioPos end, PQsList). + +skip_until_msg(PQ, MsgPos) -> + case ?PQUEUE:out(PQ) of + {empty, PQ1} -> + PQ1; + {{value, Msg}, PQ1} -> + case insert_ts(Msg) > MsgPos of + true -> PQ; + false -> skip_until_msg(PQ1, MsgPos) + end + end. + +sublist(PQs, Len, Acc, LastPosPrio) when PQs =:= []; Len =:= 0 -> + {Acc, LastPosPrio}; +sublist([{Prio, PQ} | T], Len, Acc, LastPosPrio) -> + {SingleQAcc, SingleQSize} = sublist_single_pq(Prio, PQ, Len, [], 0), + Acc1 = Acc ++ lists:reverse(SingleQAcc), + NxtPosPrio = + case SingleQAcc of + [H | _] -> {insert_ts(H), Prio}; + [] -> LastPosPrio + end, + case SingleQSize =:= Len of + true -> + {Acc1, NxtPosPrio}; + false -> + sublist(T, Len - SingleQSize, Acc1, NxtPosPrio) + end. + +sublist_single_pq(_Prio, _PQ, 0, Acc, AccSize) -> + {Acc, AccSize}; +sublist_single_pq(Prio, PQ, Len, Acc, AccSize) -> + case ?PQUEUE:out(0, PQ) of + {empty, _PQ} -> + {Acc, AccSize}; + {{value, Msg}, PQ1} -> + Msg1 = with_prio(Msg, Prio), + sublist_single_pq(Prio, PQ1, Len - 1, [Msg1 | Acc], AccSize + 1) + end. + +with_prio(#message{extra = Extra} = Msg, Prio) -> + Msg#message{extra = Extra#{mqueue_priority => Prio}}. to_list(MQ, Acc) -> case out(MQ) of @@ -256,14 +285,15 @@ in( ) -> Priority = get_priority(Topic, PTab, Dp), PLen = ?PQUEUE:plen(Priority, Q), + Msg1 = with_ts(Msg), case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of true -> %% reached max length, drop the oldest message {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q), - Q2 = ?PQUEUE:in(Msg, Priority, Q1), - {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; + Q2 = ?PQUEUE:in(Msg1, Priority, Q1), + {without_ts(DroppedMsg), MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> - {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} + {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg1, Priority, Q)}} end. -spec out(mqueue()) -> {empty | {value, message()}, mqueue()}. @@ -280,7 +310,7 @@ out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts last_prio = Prio, p_credit = get_credits(Prio, ShiftOpts) }, - {{value, Val}, MQ1}; + {{value, without_ts(Val)}, MQ1}; out(MQ = #mqueue{q = Q, p_credit = 0}) -> MQ1 = MQ#mqueue{ q = ?PQUEUE:shift(Q), @@ -288,8 +318,12 @@ out(MQ = #mqueue{q = Q, p_credit = 0}) -> }, out(MQ1); out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) -> - {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}. + {R, Q2} = + case ?PQUEUE:out(Q) of + {{value, Val}, Q1} -> {{value, without_ts(Val)}, Q1}; + Other -> Other + end, + {R, MQ#mqueue{q = Q2, len = Len - 1, p_credit = Cnt - 1}}. get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -359,3 +393,23 @@ p_table(PTab = #{}) -> ); p_table(PTab) -> PTab. + +%% This is used to sort/traverse messages in query/2 +with_ts(#message{extra = Extra} = Msg) -> + TsNano = erlang:system_time(nanosecond), + Extra1 = + case is_map(Extra) of + true -> Extra; + %% extra field has not being used before EMQX 5.4.0 + %% and defaulted to an empty list, + %% if it's not a map it's safe to overwrite it + false -> #{} + end, + Msg#message{extra = Extra1#{?INSERT_TS => TsNano}}. + +without_ts(#message{extra = Extra} = Msg) -> + Msg#message{extra = maps:remove(?INSERT_TS, Extra)}; +without_ts(Msg) -> + Msg. + +insert_ts(#message{extra = #{?INSERT_TS := Ts}}) -> Ts. diff --git a/apps/emqx/src/emqx_pqueue.erl b/apps/emqx/src/emqx_pqueue.erl index 1b3b2a463..19b7c3df9 100644 --- a/apps/emqx/src/emqx_pqueue.erl +++ b/apps/emqx/src/emqx_pqueue.erl @@ -46,6 +46,7 @@ len/1, plen/2, to_list/1, + to_queues_list/1, from_list/1, in/2, in/3, @@ -121,6 +122,18 @@ to_list({pqueue, Queues}) -> {0, V} <- to_list(Q) ]. +-spec to_queues_list(pqueue()) -> [{priority(), squeue()}]. +to_queues_list({queue, _In, _Out, _Len} = Squeue) -> + [{0, Squeue}]; +to_queues_list({pqueue, Queues}) -> + lists:sort( + fun + ({infinity = _P1, _}, {_P2, _}) -> true; + ({P1, _}, {P2, _}) -> P1 >= P2 + end, + [{maybe_negate_priority(P), Q} || {P, Q} <- Queues] + ). + -spec from_list([{priority(), any()}]) -> pqueue(). from_list(L) -> lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index dbb440f41..2e12d330c 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -146,6 +146,8 @@ -define(DEFAULT_BATCH_N, 1000). +-define(INFLIGHT_INSERT_TS, inflight_insert_ts). + %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- @@ -269,8 +271,7 @@ info(inflight_cnt, #session{inflight = Inflight}) -> info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) -> - {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams), - {[I#inflight_data.message || {_, I} <- InflightList], Meta}; + inflight_query(Inflight, PagerParams); info(retry_interval, #session{retry_interval = Interval}) -> Interval; info(mqueue, #session{mqueue = MQueue}) -> @@ -396,7 +397,7 @@ puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), Session1 = Session#session{inflight = Inflight1}, {ok, Replies, Session2} = dequeue(ClientInfo, Session1), - {ok, Msg, Replies, Session2}; + {ok, without_inflight_insert_ts(Msg), Replies, Session2}; {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -415,7 +416,7 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) -> {value, #inflight_data{phase = wait_ack, message = Msg} = Data} -> Update = Data#inflight_data{phase = wait_comp}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), - {ok, Msg, Session#session{inflight = Inflight1}}; + {ok, without_inflight_insert_ts(Msg), Session#session{inflight = Inflight1}}; {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -451,7 +452,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), Session1 = Session#session{inflight = Inflight1}, {ok, Replies, Session2} = dequeue(ClientInfo, Session1), - {ok, Msg, Replies, Session2}; + {ok, without_inflight_insert_ts(Msg), Replies, Session2}; {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -636,7 +637,7 @@ do_retry_delivery( _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> - Msg1 = emqx_message:set_flag(dup, true, Msg), + Msg1 = without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg)), Update = Data#inflight_data{message = Msg1, timestamp = Now}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {[{PacketId, Msg1} | Acc], Inflight1} @@ -728,7 +729,7 @@ replay(ClientInfo, Session) -> ({PacketId, #inflight_data{phase = wait_comp}}) -> {pubrel, PacketId}; ({PacketId, #inflight_data{message = Msg}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)} + {PacketId, without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg))} end, emqx_inflight:to_list(Session#session.inflight) ), @@ -775,7 +776,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> %% If the Client's Session terminates before the Client reconnects, %% the Server MUST NOT send the Application Message to any other %% subscribed Client [MQTT-4.8.2-5]. - {true, Msg}; + {true, without_inflight_insert_ts(Msg)}; ({_PacketId, #inflight_data{}}) -> false end, @@ -798,22 +799,83 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %% Helper functions %%-------------------------------------------------------------------- --compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}). +-compile( + {inline, [ + sort_fun/2, batch_n/1, inflight_insert_ts/1, without_inflight_insert_ts/1, with_ts/1, age/2 + ]} +). sort_fun({_, A}, {_, B}) -> A#inflight_data.timestamp =< B#inflight_data.timestamp. +query_sort_fun({_, #inflight_data{message = A}}, {_, #inflight_data{message = B}}) -> + inflight_insert_ts(A) =< inflight_insert_ts(B). + +-spec inflight_query(emqx_inflight:inflight(), #{ + position => integer() | none, limit := pos_integer() +}) -> + {[emqx_types:message()], #{position := integer() | none, start := integer() | none}}. +inflight_query(Inflight, #{limit := Limit} = PagerParams) -> + InflightL = emqx_inflight:to_list(fun query_sort_fun/2, Inflight), + StartPos = + case InflightL of + [{_, #inflight_data{message = FirstM}} | _] -> inflight_insert_ts(FirstM); + [] -> none + end, + Position = maps:get(position, PagerParams, none), + InflightMsgs = sublist_from_pos(InflightL, Position, Limit), + NextPos = + case InflightMsgs of + [_ | _] = L -> + inflight_insert_ts(lists:last(L)); + [] -> + Position + end, + {InflightMsgs, #{start => StartPos, position => NextPos}}. + +sublist_from_pos(InflightList, none = _Position, Limit) -> + inflight_msgs_sublist(InflightList, Limit); +sublist_from_pos(InflightList, Position, Limit) -> + Inflight = lists:dropwhile( + fun({_, #inflight_data{message = M}}) -> + inflight_insert_ts(M) =< Position + end, + InflightList + ), + inflight_msgs_sublist(Inflight, Limit). + +%% Small optimization to get sublist and drop keys in one traversal +inflight_msgs_sublist([{_Key, #inflight_data{message = Msg}} | T], Limit) when Limit > 0 -> + [Msg | inflight_msgs_sublist(T, Limit - 1)]; +inflight_msgs_sublist(_, _) -> + []. + +inflight_insert_ts(#message{extra = #{?INFLIGHT_INSERT_TS := Ts}}) -> Ts. + +without_inflight_insert_ts(#message{extra = Extra} = Msg) -> + Msg#message{extra = maps:remove(?INFLIGHT_INSERT_TS, Extra)}. + batch_n(Inflight) -> case emqx_inflight:max_size(Inflight) of 0 -> ?DEFAULT_BATCH_N; Sz -> Sz - emqx_inflight:size(Inflight) end. -with_ts(Msg) -> +with_ts(#message{extra = Extra} = Msg) -> + InsertTsNano = erlang:system_time(nanosecond), + %% This is used to sort/traverse messages in inflight_query/2 + Extra1 = + case is_map(Extra) of + true -> Extra; + %% extra field has not being used before EMQX 5.4.0 and defaulted to an empty list, + %% if it's not a map it's safe to overwrite it + false -> #{} + end, + Msg1 = Msg#message{extra = Extra1#{?INFLIGHT_INSERT_TS => InsertTsNano}}, #inflight_data{ phase = wait_ack, - message = Msg, - timestamp = erlang:system_time(millisecond) + message = Msg1, + timestamp = erlang:convert_time_unit(InsertTsNano, nanosecond, millisecond) }. age(Now, Ts) -> Now - Ts. diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl index a220129af..d8dd8e969 100644 --- a/apps/emqx/test/emqx_inflight_SUITE.erl +++ b/apps/emqx/test/emqx_inflight_SUITE.erl @@ -126,73 +126,3 @@ t_to_list(_) -> ), ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)], ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)). - -t_query(_) -> - EmptyInflight = emqx_inflight:new(500), - ?assertMatch( - {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50}) - ), - ?assertMatch( - {[], #{continuation := end_of_data}}, - emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50}) - ), - ?assertMatch( - {[], #{continuation := end_of_data}}, - emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50}) - ), - - Inflight = lists:foldl( - fun(Seq, QAcc) -> - emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc) - end, - EmptyInflight, - lists:reverse(lists:seq(1, 114)) - ), - - LastCont = lists:foldl( - fun(PageSeq, Cont) -> - Limit = 10, - PagerParams = #{continuation => Cont, limit => Limit}, - {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams), - ?assertEqual(10, length(Page)), - ExpFirst = PageSeq * Limit - Limit + 1, - ExpLast = PageSeq * Limit, - ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)), - ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)), - ?assertMatch( - #{count := 114, continuation := IntCont} when is_integer(IntCont), - Meta - ), - NextCont - end, - none, - lists:seq(1, 11) - ), - {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{ - continuation => LastCont, limit => 10 - }), - ?assertEqual(4, length(LastPartialPage)), - ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)), - ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)), - ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), - - ?assertMatch( - {[], #{continuation := end_of_data}}, - emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10}) - ), - - {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}), - ?assertEqual(114, length(LargePage)), - ?assertEqual({1, <<"1">>}, hd(LargePage)), - ?assertEqual({114, <<"114">>}, lists:last(LargePage)), - ?assertMatch(#{continuation := end_of_data}, LargeMeta), - - {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}), - ?assertEqual(114, length(FullPage)), - ?assertEqual({1, <<"1">>}, hd(FullPage)), - ?assertEqual({114, <<"114">>}, lists:last(FullPage)), - ?assertMatch(#{continuation := end_of_data}, FullMeta), - - {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}), - ?assertEqual([], EmptyPage), - ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). diff --git a/apps/emqx/test/emqx_mqueue_SUITE.erl b/apps/emqx/test/emqx_mqueue_SUITE.erl index f3e1629a7..8128c5e1c 100644 --- a/apps/emqx/test/emqx_mqueue_SUITE.erl +++ b/apps/emqx/test/emqx_mqueue_SUITE.erl @@ -284,13 +284,15 @@ t_dropped(_) -> t_query(_) -> EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}), - ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})), - ?assertMatch( - {[], #{continuation := end_of_data}}, - ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50}) + ?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})), + RandPos = {erlang:system_time(nanosecond), 0}, + ?assertEqual( + {[], #{position => RandPos, start => none}}, + ?Q:query(EmptyQ, #{position => RandPos, limit => 50}) ), - ?assertMatch( - {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50}) + ?assertEqual( + {[], #{position => none, start => none}}, + ?Q:query(EmptyQ, #{continuation => none, limit => 50}) ), Q = lists:foldl( @@ -303,52 +305,146 @@ t_query(_) -> lists:seq(1, 114) ), - LastCont = lists:foldl( - fun(PageSeq, Cont) -> + {LastPos, LastStart} = lists:foldl( + fun(PageSeq, {Pos, PrevStart}) -> Limit = 10, - PagerParams = #{continuation => Cont, limit => Limit}, - {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams), + PagerParams = #{position => Pos, limit => Limit}, + {Page, #{position := NextPos, start := Start}} = ?Q:query(Q, PagerParams), ?assertEqual(10, length(Page)), ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), ExpLastPayload = integer_to_binary(PageSeq * Limit), - ?assertEqual( - ExpFirstPayload, - emqx_message:payload(lists:nth(1, Page)), - #{page_seq => PageSeq, page => Page, meta => Meta} - ), - ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))), - ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta), - NextCont + FirstMsg = lists:nth(1, Page), + LastMsg = lists:nth(10, Page), + ?assertEqual(ExpFirstPayload, emqx_message:payload(FirstMsg)), + ?assertEqual(ExpLastPayload, emqx_message:payload(LastMsg)), + %% start value must not change as Mqueue is not modified during traversal + NextStart = + case PageSeq of + 1 -> + ?assertEqual({mqueue_ts(FirstMsg), 0}, Start), + Start; + _ -> + ?assertEqual(PrevStart, Start), + PrevStart + end, + {NextPos, NextStart} end, - none, + {none, none}, lists:seq(1, 11) ), - {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}), + + {LastPartialPage, #{position := FinalPos} = LastMeta} = ?Q:query(Q, #{ + position => LastPos, limit => 10 + }), + LastMsg = lists:nth(4, LastPartialPage), ?assertEqual(4, length(LastPartialPage)), ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))), - ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))), - ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), - - ?assertMatch( - {[], #{continuation := end_of_data}}, - ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10}) + ?assertEqual(<<"114">>, emqx_message:payload(LastMsg)), + ?assertEqual(#{position => {mqueue_ts(LastMsg), 0}, start => LastStart}, LastMeta), + ?assertEqual( + {[], #{start => LastStart, position => FinalPos}}, + ?Q:query(Q, #{position => FinalPos, limit => 10}) ), - {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}), + {LargePage, LargeMeta} = ?Q:query(Q, #{position => none, limit => 1000}), ?assertEqual(114, length(LargePage)), ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))), ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))), - ?assertMatch(#{continuation := end_of_data}, LargeMeta), + ?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta), - {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}), - ?assertEqual(114, length(FullPage)), - ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))), - ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))), - ?assertMatch(#{continuation := end_of_data}, FullMeta), + {FullPage, FullMeta} = ?Q:query(Q, #{position => none, limit => 114}), + ?assertEqual(LargePage, FullPage), + ?assertEqual(LargeMeta, FullMeta), - {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}), - ?assertEqual([], EmptyPage), - ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). + {_, Q1} = emqx_mqueue:out(Q), + {PageAfterRemove, #{start := StartAfterRemove}} = ?Q:query(Q1, #{position => none, limit => 10}), + ?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))), + ?assertEqual(StartAfterRemove, {mqueue_ts(hd(PageAfterRemove)), 0}). + +t_query_with_priorities(_) -> + Priorities = #{<<"t/infinity">> => infinity, <<"t/10">> => 10, <<"t/5">> => 5}, + EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true, priorities => Priorities}), + + ?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})), + RandPos = {erlang:system_time(nanosecond), 0}, + ?assertEqual( + {[], #{position => RandPos, start => none}}, + ?Q:query(EmptyQ, #{position => RandPos, limit => 50}) + ), + ?assertEqual( + {[], #{position => none, start => none}}, + ?Q:query(EmptyQ, #{continuation => none, limit => 50}) + ), + + {Q, ExpMsgsAcc} = lists:foldl( + fun(Topic, {QAcc, MsgsAcc}) -> + {TopicQ, TopicMsgs} = + lists:foldl( + fun(Seq, {TopicQAcc, TopicMsgsAcc}) -> + Payload = <>, + Msg = emqx_message:make(Topic, Payload), + {_, TopicQAcc1} = ?Q:in(Msg, TopicQAcc), + {TopicQAcc1, [Msg | TopicMsgsAcc]} + end, + {QAcc, []}, + lists:seq(1, 10) + ), + {TopicQ, [lists:reverse(TopicMsgs) | MsgsAcc]} + end, + {EmptyQ, []}, + [<<"t/test">>, <<"t/5">>, <<"t/infinity">>, <<"t/10">>] + ), + + %% Manual resorting from the highest to the lowest priority + [ExpMsgsPrio0, ExpMsgsPrio5, ExpMsgsPrioInf, ExpMsgsPrio10] = lists:reverse(ExpMsgsAcc), + ExpMsgs = ExpMsgsPrioInf ++ ExpMsgsPrio10 ++ ExpMsgsPrio5 ++ ExpMsgsPrio0, + {AllMsgs, #{start := StartPos, position := Pos}} = ?Q:query(Q, #{position => none, limit => 40}), + ?assertEqual(40, length(AllMsgs)), + ?assertEqual(ExpMsgs, with_empty_extra(AllMsgs)), + FirstMsg = hd(AllMsgs), + LastMsg = lists:last(AllMsgs), + ?assertEqual(<<"t/infinity_1">>, emqx_message:payload(FirstMsg)), + ?assertEqual(StartPos, {mqueue_ts(FirstMsg), infinity}), + ?assertEqual(<<"t/test_10">>, emqx_message:payload(LastMsg)), + ?assertMatch({_, 0}, Pos), + ?assertEqual(Pos, {mqueue_ts(LastMsg), mqueue_prio(LastMsg)}), + + Pos5 = {mqueue_ts(lists:nth(5, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))}, + LastInfPos = {mqueue_ts(lists:nth(10, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))}, + + {MsgsPrioInfTo10, #{start := StartPos, position := PosPrio10Msg5}} = ?Q:query(Q, #{ + position => Pos5, limit => 10 + }), + ?assertEqual(10, length(MsgsPrioInfTo10)), + ?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo10))), + ?assertEqual(<<"t/10_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo10))), + ?assertEqual(PosPrio10Msg5, { + mqueue_ts(lists:last(MsgsPrioInfTo10)), mqueue_prio(lists:last(MsgsPrioInfTo10)) + }), + + {MsgsPrioInfTo5, #{start := StartPos, position := PosPrio5Msg5}} = ?Q:query(Q, #{ + position => Pos5, limit => 20 + }), + ?assertEqual(20, length(MsgsPrioInfTo5)), + ?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo5))), + ?assertEqual(<<"t/5_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo5))), + ?assertEqual(PosPrio5Msg5, { + mqueue_ts(lists:last(MsgsPrioInfTo5)), mqueue_prio(lists:last(MsgsPrioInfTo5)) + }), + + {MsgsPrio10, #{start := StartPos, position := PosPrio10}} = ?Q:query(Q, #{ + position => LastInfPos, limit => 10 + }), + ?assertEqual(ExpMsgsPrio10, with_empty_extra(MsgsPrio10)), + ?assertEqual(10, length(MsgsPrio10)), + ?assertEqual(<<"t/10_1">>, emqx_message:payload(hd(MsgsPrio10))), + ?assertEqual(<<"t/10_10">>, emqx_message:payload(lists:last(MsgsPrio10))), + ?assertEqual(PosPrio10, {mqueue_ts(lists:last(MsgsPrio10)), mqueue_prio(lists:last(MsgsPrio10))}), + + {MsgsPrio10To5, #{start := StartPos, position := _}} = ?Q:query(Q, #{ + position => LastInfPos, limit => 20 + }), + ?assertEqual(ExpMsgsPrio10 ++ ExpMsgsPrio5, with_empty_extra(MsgsPrio10To5)). conservation_prop() -> ?FORALL( @@ -413,3 +509,9 @@ drain(Q) -> {{value, #message{topic = T, payload = P}}, Q1} -> [{T, P} | drain(Q1)] end. + +mqueue_ts(#message{extra = #{mqueue_insert_ts := Ts}}) -> Ts. +mqueue_prio(#message{extra = #{mqueue_priority := Prio}}) -> Prio. + +with_empty_extra(Msgs) -> + [M#message{extra = #{}} || M <- Msgs]. diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index ec98388d7..75b6545fa 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -115,6 +116,80 @@ t_session_stats(_) -> maps:from_list(Stats) ). +t_session_inflight_query(_) -> + EmptyInflight = emqx_inflight:new(500), + Session = session(#{inflight => EmptyInflight}), + EmptyQueryResMeta = {[], #{position => none, start => none}}, + ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)), + ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)), + RandPos = erlang:system_time(nanosecond), + ?assertEqual({[], #{position => RandPos, start => none}}, inflight_query(Session, RandPos, 10)), + Inflight = lists:foldl( + fun(Seq, Acc) -> + Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, integer_to_binary(Seq)), + emqx_inflight:insert(Seq, emqx_session_mem:with_ts(Msg), Acc) + end, + EmptyInflight, + lists:seq(1, 114) + ), + + Session1 = session(#{inflight => Inflight}), + + {LastPos, LastStart} = lists:foldl( + fun(PageSeq, {Pos, PrevStart}) -> + Limit = 10, + {Page, #{position := NextPos, start := Start}} = inflight_query(Session1, Pos, Limit), + ?assertEqual(10, length(Page)), + ExpFirst = PageSeq * Limit - Limit + 1, + ExpLast = PageSeq * Limit, + FirstMsg = lists:nth(1, Page), + LastMsg = lists:nth(10, Page), + ?assertEqual(integer_to_binary(ExpFirst), emqx_message:payload(FirstMsg)), + ?assertEqual(integer_to_binary(ExpLast), emqx_message:payload(LastMsg)), + %% start value must not change as Inflight is not modified during traversal + NextStart = + case PageSeq of + 1 -> + ?assertEqual(inflight_ts(FirstMsg), Start), + Start; + _ -> + ?assertEqual(PrevStart, Start), + PrevStart + end, + ?assertEqual(inflight_ts(LastMsg), NextPos), + {NextPos, NextStart} + end, + {none, none}, + lists:seq(1, 11) + ), + {LastPartialPage, #{position := FinalPos} = LastMeta} = inflight_query( + Session1, LastPos, 10 + ), + LastMsg = lists:nth(4, LastPartialPage), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))), + ?assertEqual(<<"114">>, emqx_message:payload(LastMsg)), + ?assertEqual(#{position => inflight_ts(LastMsg), start => LastStart}, LastMeta), + ?assertEqual( + {[], #{start => LastStart, position => FinalPos}}, + inflight_query(Session1, FinalPos, 10) + ), + + {LargePage, LargeMeta} = inflight_query(Session1, none, 1000), + ?assertEqual(114, length(LargePage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))), + ?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta), + + {FullPage, FullMeta} = inflight_query(Session1, none, 114), + ?assertEqual(LargePage, FullPage), + ?assertEqual(LargeMeta, FullMeta), + + Session2 = session(#{inflight => emqx_inflight:delete(1, Inflight)}), + {PageAfterRemove, #{start := StartAfterRemove}} = inflight_query(Session2, none, 10), + ?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))), + ?assertEqual(StartAfterRemove, inflight_ts(hd(PageAfterRemove))). + %%-------------------------------------------------------------------- %% Test cases for sub/unsub %%-------------------------------------------------------------------- @@ -274,9 +349,10 @@ t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), + Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, Msg), emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, undefined, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session), + {ok, Msg, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)). t_pubcomp_error_packetid_in_use(_) -> @@ -598,3 +674,8 @@ set_duplicate_pub({Id, Msg}) -> get_packet_id({Id, _}) -> Id. + +inflight_query(Session, Pos, Limit) -> + emqx_session_mem:info({inflight_msgs, #{position => Pos, limit => Limit}}, Session). + +inflight_ts(#message{extra = #{inflight_insert_ts := Ts}}) -> Ts. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 38d5df662..a6038bcb7 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -178,36 +178,27 @@ fields(hasnext) -> >>, Meta = #{desc => Desc, required => true}, [{hasnext, hoconsc:mk(boolean(), Meta)}]; -fields('after') -> - Desc = << - "The value of \"last\" field returned in the previous response. It can then be used" - " in subsequent requests to get the next chunk of results.
" - "It is used instead of \"page\" parameter to traverse volatile data.
" - "Can be omitted or set to \"none\" to get the first chunk of data.
" - "\last\" = end_of_data\" is returned, if there is no more data.
" - "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" - " error response." - >>, - Meta = #{ - in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">> - }, - [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; -fields(last) -> +fields(position) -> Desc = << "An opaque token that can then be in subsequent requests to get " - " the next chunk of results: \"?after={last}\"
" - "if there is no more data, \"last\" = end_of_data\" is returned.
" - "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" - " error response." + " the next chunk of results: \"?position={prev_response.meta.position}\"
" + "It is used instead of \"page\" parameter to traverse highly volatile data.
" + "Can be omitted or set to \"none\" to get the first chunk of data." >>, Meta = #{ - desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">> + in => query, desc => Desc, required => false, example => <<"none">> }, - [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; + [{position, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; +fields(start) -> + Desc = <<"The position of the current first element of the data collection.">>, + Meta = #{ + desc => Desc, required => true, example => <<"none">> + }, + [{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}]; fields(meta) -> fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext); fields(continuation_meta) -> - fields(last) ++ fields(count). + fields(start) ++ fields(position). -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema(). schema_with_example(Type, Example) -> diff --git a/apps/emqx_management/include/emqx_mgmt.hrl b/apps/emqx_management/include/emqx_mgmt.hrl index a802bad21..8ad1cd871 100644 --- a/apps/emqx_management/include/emqx_mgmt.hrl +++ b/apps/emqx_management/include/emqx_mgmt.hrl @@ -15,6 +15,3 @@ %%-------------------------------------------------------------------- -define(DEFAULT_ROW_LIMIT, 100). - --define(URL_PARAM_INTEGER, url_param_integer). --define(URL_PARAM_BINARY, url_param_binary). diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 1b4e9a255..39b72ca89 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -39,7 +39,6 @@ -export([ parse_pager_params/1, parse_cont_pager_params/2, - encode_cont_pager_params/2, parse_qstring/2, init_query_result/0, init_query_state/5, @@ -138,32 +137,18 @@ page(Params) -> limit(Params) when is_map(Params) -> maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()). -continuation(Params, Encoding) -> +position(Params, Decoder) -> try - decode_continuation(maps:get(<<"after">>, Params, none), Encoding) + decode_position(maps:get(<<"position">>, Params, none), Decoder) catch _:_ -> error end. -decode_continuation(none, _Encoding) -> +decode_position(none, _Decoder) -> none; -decode_continuation(end_of_data, _Encoding) -> - %% Clients should not send "after=end_of_data" back to the server - error; -decode_continuation(Cont, ?URL_PARAM_INTEGER) -> - binary_to_integer(Cont); -decode_continuation(Cont, ?URL_PARAM_BINARY) -> - emqx_utils:hexstr_to_bin(Cont). - -encode_continuation(none, _Encoding) -> - none; -encode_continuation(end_of_data, _Encoding) -> - end_of_data; -encode_continuation(Cont, ?URL_PARAM_INTEGER) -> - integer_to_binary(Cont); -encode_continuation(Cont, ?URL_PARAM_BINARY) -> - emqx_utils:bin_to_hexstr(Cont, lower). +decode_position(Pos, Decoder) -> + Decoder(Pos). %%-------------------------------------------------------------------- %% Node Query @@ -670,25 +655,18 @@ parse_pager_params(Params) -> false end. --spec parse_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) -> - #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false. -parse_cont_pager_params(Params, Encoding) -> - Cont = continuation(Params, Encoding), +-spec parse_cont_pager_params(map(), fun((binary()) -> term())) -> + #{limit := pos_integer(), position := none | term()} | false. +parse_cont_pager_params(Params, PositionDecoder) -> + Pos = position(Params, PositionDecoder), Limit = b2i(limit(Params)), - case Limit > 0 andalso Cont =/= error of + case Limit > 0 andalso Pos =/= error of true -> - #{continuation => Cont, limit => Limit}; + #{position => Pos, limit => Limit}; false -> false end. --spec encode_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) -> map(). -encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) -> - Meta1 = maps:remove(continuation, Meta), - Meta1#{last => encode_continuation(Cont, ContEncoding)}; -encode_cont_pager_params(Meta, _ContEncoding) -> - Meta. - %%-------------------------------------------------------------------- %% Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index cc50a9178..e011f609b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -413,11 +413,11 @@ schema("/clients/:clientid/keepalive") -> } }; schema("/clients/:clientid/mqueue_messages") -> - ContExample = <<"AAYS53qRa0n07AAABFIACg">>, + ContExample = <<"1710785444656449826_10">>, RespSchema = ?R_REF(mqueue_messages), client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema); schema("/clients/:clientid/inflight_messages") -> - ContExample = <<"10">>, + ContExample = <<"1710785444656449826">>, RespSchema = ?R_REF(inflight_messages), client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema); schema("/sessions_count") -> @@ -656,7 +656,7 @@ fields(unsubscribe) -> ]; fields(mqueue_messages) -> [ - {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})}, + {data, hoconsc:mk(hoconsc:array(?REF(mqueue_message)), #{desc => ?DESC(mqueue_msgs_list)})}, {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})} ]; fields(inflight_messages) -> @@ -672,8 +672,18 @@ fields(message) -> {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})}, {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})}, {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})}, - {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})} + {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}, + {inserted_at, hoconsc:mk(binary(), #{desc => ?DESC(msg_inserted_at)})} ]; +fields(mqueue_message) -> + fields(message) ++ + [ + {mqueue_priority, + hoconsc:mk( + hoconsc:union([integer(), infinity]), + #{desc => ?DESC(msg_mqueue_priority)} + )} + ]; fields(requested_client_fields) -> %% NOTE: some Client fields actually returned in response are missing in schema: %% enable_authn, is_persistent, listener, peerport @@ -920,7 +930,7 @@ client_msgs_schema(OpId, Desc, ContExample, RespSchema) -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_example(RespSchema, #{ - <<"data">> => [message_example()], + <<"data">> => [message_example(OpId)], <<"meta">> => #{ <<"count">> => 100, <<"last">> => ContExample @@ -963,7 +973,7 @@ client_msgs_params() -> >>, validator => fun max_bytes_validator/1 })}, - hoconsc:ref(emqx_dashboard_swagger, 'after'), + hoconsc:ref(emqx_dashboard_swagger, position), hoconsc:ref(emqx_dashboard_swagger, limit) ]. @@ -1200,9 +1210,9 @@ is_live_session(ClientId) -> [] =/= emqx_cm_registry:lookup_channels(ClientId). list_client_msgs(MsgType, ClientID, QString) -> - case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of + case emqx_mgmt_api:parse_cont_pager_params(QString, pos_decoder(MsgType)) of false -> - {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}}; + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"position_limit_invalid">>}}; PagerParams = #{} -> case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of {error, not_found} -> @@ -1212,10 +1222,34 @@ list_client_msgs(MsgType, ClientID, QString) -> end end. -%% integer packet id -cont_encoding(inflight_msgs) -> ?URL_PARAM_INTEGER; -%% binary message id -cont_encoding(mqueue_msgs) -> ?URL_PARAM_BINARY. +pos_decoder(mqueue_msgs) -> fun decode_mqueue_pos/1; +pos_decoder(inflight_msgs) -> fun decode_msg_pos/1. + +encode_msgs_meta(_MsgType, #{start := StartPos, position := Pos}) -> + #{start => encode_pos(StartPos), position => encode_pos(Pos)}. + +encode_pos(none) -> + none; +encode_pos({MsgPos, PrioPos}) -> + MsgPosBin = integer_to_binary(MsgPos), + PrioPosBin = + case PrioPos of + infinity -> <<"infinity">>; + _ -> integer_to_binary(PrioPos) + end, + <>; +encode_pos(Pos) when is_integer(Pos) -> + integer_to_binary(Pos). + +-spec decode_mqueue_pos(binary()) -> {integer(), infinity | integer()}. +decode_mqueue_pos(Pos) -> + [MsgPos, PrioPos] = binary:split(Pos, <<"_">>), + {decode_msg_pos(MsgPos), decode_priority_pos(PrioPos)}. + +decode_msg_pos(Pos) -> binary_to_integer(Pos). + +decode_priority_pos(<<"infinity">>) -> infinity; +decode_priority_pos(Pos) -> binary_to_integer(Pos). max_bytes_validator(MaxBytes) when is_integer(MaxBytes), MaxBytes > 0 -> ok; @@ -1415,8 +1449,8 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) -> <<"payload">> := PayloadFmt, <<"max_payload_bytes">> := MaxBytes } = QString, - Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)), - Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)}, + Meta1 = encode_msgs_meta(MsgType, Meta), + Resp = #{meta => Meta1, data => format_msgs(MsgType, Msgs, PayloadFmt, MaxBytes)}, %% Make sure minirest won't set another content-type for self-encoded JSON response body Headers = #{<<"content-type">> => <<"application/json">>}, case emqx_utils_json:safe_encode(Resp) of @@ -1432,13 +1466,13 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) -> ?INTERNAL_ERROR(Error) end. -format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) -> +format_msgs(MsgType, [FirstMsg | Msgs], PayloadFmt, MaxBytes) -> %% Always include at least one message payload, even if it exceeds the limit - {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt), + {FirstMsg1, PayloadSize0} = format_msg(MsgType, FirstMsg, PayloadFmt), {Msgs1, _} = catch lists:foldl( fun(Msg, {MsgsAcc, SizeAcc} = Acc) -> - {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt), + {Msg1, PayloadSize} = format_msg(MsgType, Msg, PayloadFmt), case SizeAcc + PayloadSize of SizeAcc1 when SizeAcc1 =< MaxBytes -> {[Msg1 | MsgsAcc], SizeAcc1}; @@ -1450,10 +1484,11 @@ format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) -> Msgs ), lists:reverse(Msgs1); -format_msgs([], _PayloadFmt, _MaxBytes) -> +format_msgs(_MsgType, [], _PayloadFmt, _MaxBytes) -> []. format_msg( + MsgType, #message{ id = ID, qos = Qos, @@ -1462,10 +1497,10 @@ format_msg( timestamp = Timestamp, headers = Headers, payload = Payload - }, + } = Msg, PayloadFmt ) -> - Msg = #{ + MsgMap = #{ msgid => emqx_guid:to_hexstr(ID), qos => Qos, topic => Topic, @@ -1473,15 +1508,23 @@ format_msg( from_clientid => emqx_utils_conv:bin(From), from_username => maps:get(username, Headers, <<>>) }, - format_payload(PayloadFmt, Msg, Payload). + MsgMap1 = format_by_msg_type(MsgType, Msg, MsgMap), + format_payload(PayloadFmt, MsgMap1, Payload). -format_payload(none, Msg, _Payload) -> - {Msg, 0}; -format_payload(base64, Msg, Payload) -> +format_by_msg_type(mqueue_msgs, Msg, MsgMap) -> + #message{extra = #{mqueue_priority := Prio, mqueue_insert_ts := Ts}} = Msg, + MsgMap#{mqueue_priority => Prio, inserted_at => integer_to_binary(Ts)}; +format_by_msg_type(inflight_msgs, Msg, MsgMap) -> + #message{extra = #{inflight_insert_ts := Ts}} = Msg, + MsgMap#{inserted_at => integer_to_binary(Ts)}. + +format_payload(none, MsgMap, _Payload) -> + {MsgMap, 0}; +format_payload(base64, MsgMap, Payload) -> Payload1 = base64:encode(Payload), - {Msg#{payload => Payload1}, erlang:byte_size(Payload1)}; -format_payload(plain, Msg, Payload) -> - {Msg#{payload => Payload}, erlang:iolist_size(Payload)}. + {MsgMap#{payload => Payload1}, erlang:byte_size(Payload1)}; +format_payload(plain, MsgMap, Payload) -> + {MsgMap#{payload => Payload}, erlang:iolist_size(Payload)}. %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> @@ -1584,6 +1627,11 @@ client_example() -> <<"recv_msg.qos0">> => 0 }. +message_example(inflight_msgs) -> + message_example(); +message_example(mqueue_msgs) -> + (message_example())#{<<"mqueue_priority">> => 0}. + message_example() -> #{ <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>, diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index e650b802b..2f4804158 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1070,18 +1070,19 @@ t_mqueue_messages(Config) -> Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)), + IsMqueue = true, + test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue), ?assertMatch( {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api( - get, Path, "limit=10&after=not-base64%23%21", AuthHeader + get, Path, "limit=10&position=not-valid", AuthHeader ) ), ?assertMatch( {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api( - get, Path, "limit=-5&after=not-base64%23%21", AuthHeader + get, Path, "limit=-5&position=not-valid", AuthHeader ) ). @@ -1093,18 +1094,21 @@ t_inflight_messages(Config) -> Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]), InflightLimit = emqx:get_config([mqtt, max_inflight]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)), + IsMqueue = false, + test_messages( + Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue + ), ?assertMatch( {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api( - get, Path, "limit=10&after=not-int", AuthHeader + get, Path, "limit=10&position=not-int", AuthHeader ) ), ?assertMatch( {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api( - get, Path, "limit=-5&after=invalid-int", AuthHeader + get, Path, "limit=-5&position=invalid-int", AuthHeader ) ), emqtt:stop(Client). @@ -1142,19 +1146,16 @@ publish_msgs(Topic, Count) -> lists:seq(1, Count) ). -test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> +test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) -> Qs0 = io_lib:format("payload=~s", [PayloadEncoding]), {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader), #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), + #{<<"start">> := StartPos, <<"position">> := Pos} = Meta, - ?assertMatch( - #{ - <<"last">> := <<"end_of_data">>, - <<"count">> := Count - }, - Meta - ), + ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)), + ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)), ?assertEqual(length(Msgs), Count), + lists:foreach( fun({Seq, #{<<"payload">> := P} = M}) -> ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))), @@ -1165,10 +1166,12 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> <<"qos">> := _, <<"publish_at">> := _, <<"from_clientid">> := _, - <<"from_username">> := _ + <<"from_username">> := _, + <<"inserted_at">> := _ }, M - ) + ), + IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M) end, lists:zip(lists:seq(1, Count), Msgs) ), @@ -1183,62 +1186,69 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> get, Path, QsPayloadLimit, AuthHeader ), #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp), - ct:pal("~p", [FirstMsgOnly]), ?assertEqual(1, length(FirstMsgOnly)), ?assertEqual( <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding) ), Limit = 19, - LastCont = lists:foldl( - fun(PageSeq, Cont) -> - Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]), - {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader), + LastPos = lists:foldl( + fun(PageSeq, ThisPos) -> + Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]), + {ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader), #{ - <<"meta">> := #{<<"last">> := NextCont} = MetaP, - <<"data">> := MsgsP - } = emqx_utils_json:decode(MsgsRespP), - ?assertMatch(#{<<"count">> := Count}, MetaP), - ?assertNotEqual(<<"end_of_data">>, NextCont), - ?assertEqual(length(MsgsP), Limit), + <<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart}, + <<"data">> := MsgsPage + } = emqx_utils_json:decode(MsgsRespPage), + + ?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)), + %% Start position is the same in every response and points to the first msg + ?assertEqual(StartPos, ThisStart), + + ?assertEqual(length(MsgsPage), Limit), ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), ExpLastPayload = integer_to_binary(PageSeq * Limit), ?assertEqual( - ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding) + ExpFirstPayload, + decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding) ), ?assertEqual( ExpLastPayload, - decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding) + decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding) ), - NextCont + NextPos end, none, lists:seq(1, Count div 19) ), LastPartialPage = Count div 19 + 1, - LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]), + LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]), {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader), - #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode( + #{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode( MsgsRespLastP ), - ?assertEqual(<<"end_of_data">>, EmptyCont), - ?assertMatch(#{<<"count">> := Count}, MetaLastP), + %% The same as the position of all messages returned in one request + ?assertEqual(Pos, LastPartialPos), ?assertEqual( integer_to_binary(LastPartialPage * Limit - Limit + 1), - decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding) + decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding) ), ?assertEqual( integer_to_binary(Count), - decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding) + decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding) ), - ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [ - PayloadEncoding, EmptyCont, Limit + ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [ + PayloadEncoding, LastPartialPos, Limit ]), + {ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader) + #{ + <<"data">> := [], + <<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos} + }, + emqx_utils_json:decode(MsgsEmptyResp) ), %% Invalid common page params @@ -1269,6 +1279,11 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader) ). +msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) -> + <>; +msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) -> + TsBin. + decode_payload(Payload, base64) -> base64:decode(Payload); decode_payload(Payload, _) -> diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index 9731246ad..4bbc367da 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -37,6 +37,9 @@ %% Timestamp (Unit: millisecond) timestamp :: integer(), %% Miscellaneous extensions, currently used for OpenTelemetry context propagation + %% and storing mqueue/inflight insertion timestamps. + %% It was not used prior to 5.4.0 and defaulted to an empty list. + %% Must be a map now. extra = #{} :: term() }). diff --git a/changes/ce/feat-12561.en.md b/changes/ce/feat-12561.en.md index 072a71373..f3aaba55c 100644 --- a/changes/ce/feat-12561.en.md +++ b/changes/ce/feat-12561.en.md @@ -1,21 +1,20 @@ -Implement HTTP APIs to get the list of client's inflight and mqueue messages. +Implement HTTP APIs to get the list of client's in-flight and mqueue messages. To get the first chunk of data: - GET /clients/{clientid}/mqueue_messages?limit=100 - GET /clients/{clientid}/inflight_messages?limit=100 Alternatively: - - GET /clients/{clientid}/mqueue_messages?limit=100&after=none - - GET /clients/{clientid}/inflight_messages?limit=100&after=none + - GET /clients/{clientid}/mqueue_messages?limit=100&position=none + - GET /clients/{clientid}/inflight_messages?limit=100&position=none To get the next chunk of data: - - GET /clients/{clientid}/mqueue_messages?limit=100&after={last} - - GET /clients/{clientid}/inflight_messages?limit=100&after={last} + - GET /clients/{clientid}/mqueue_messages?limit=100&position={position} + - GET /clients/{clientid}/inflight_messages?limit=100&position={position} - Where {last} is a value (opaque string token) of "meta.last" field from the previous response. +Where {position} is a value (opaque string token) of "meta.position" field from the previous response. - If there is no more data, "last" = "end_of_data" is returned. - If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received. +Mqueue messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority. +By default, all messages in Mqueue have the same priority of 0. -Mqueue messages are ordered according to the queue (FIFO) order. -Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order. +In-flight messages are ordered by time at which they were inserted to the in-flight storage (from older to newer messages). diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 3175715e0..d37f52097 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -41,20 +41,22 @@ get_client_mqueue_msgs.label: """Get client mqueue messages""" get_client_inflight_msgs.desc: -"""Get client inflight messages""" +"""Get client in-flight messages""" get_client_inflight_msgs.label: -"""Get client inflight messages""" +"""Get client in-flight messages""" mqueue_msgs_list.desc: -"""Client's mqueue messages list. The queue (FIFO) ordering is preserved.""" +"""Client's mqueue messages list. +Messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority. +By default, all messages in Mqueue have the same priority of 0.""" mqueue_msgs_list.label: """Client's mqueue messages""" inflight_msgs_list.desc: -"""Client's inflight messages list. -Ordered by MQTT Packet Id, which may not represent the chronological messages order.""" +"""Client's in-flight messages list. +Messages are sorted by time at which they were inserted to the In-flight storage (from older to newer messages).""" inflight_msgs_list.label: -"""Client's inflight messages""" +"""Client's in-flight messages""" msg_id.desc: """Message ID.""" @@ -74,7 +76,7 @@ msg_topic.label: msg_publish_at.desc: """Message publish time, a millisecond precision Unix epoch timestamp.""" msg_publish_at.label: -"""Message Publish Time.""" +"""Message Publish Time""" msg_from_clientid.desc: """Message publisher's client ID.""" @@ -84,7 +86,17 @@ msg_from_clientid.desc: msg_from_username.desc: """Message publisher's username.""" msg_from_username.label: -"""Message Publisher's Username """ +"""Message Publisher's Username""" + +msg_inserted_at.desc: +"""A nanosecond precision Unix epoch timestamp at which a message was inserted to In-flight / Mqueue.""" +msg_inserted_at.label: +"""Message Insertion Time""" + +msg_mqueue_priority.desc: +"""Message Mqueue Priority.""" +msg_mqueue_priority.label: +"""Message Mqueue Priority""" subscribe.desc: """Subscribe"""